AIoT: Yolo V5 On Kera
Yolo V5
TensorFlow Keras Implemnation from sratch
Training On Custom Dataset Jupyter Notebook Demo
AIoT - Tuto 08 : Object Detection ¶
Personal Full Yolo V5 On Keras From Scratsch ¶
Custum DataSet / Jupyter Lab Version¶
¶
In the previous tuto we saw how Yolo works from its first version to actual one, and an implementation under Keras/Tensorflow using a modified LongxingTan LongxingTan Github code . In this one I'll buil a full one for a jupyter notebook demo, the aim is to use a standrad tensoflow record TFRecord for real datasets ¶
please refer to previous tuto for datatset collect and preparation
So as usual let's strat by import all what we need ¶
In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import tensorflow as tf
from skimage.transform import resize
from keras import backend as K
from keras.layers import Input, Lambda, Conv2D, BatchNormalization, LeakyReLU, ZeroPadding2D, UpSampling2D
from keras.models import load_model, Model
from keras.layers.merge import add, concatenate
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from matplotlib import pyplot
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
from matplotlib.patches import Rectangle
%matplotlib inline
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
import xml.etree.ElementTree as ET
from tqdm import tqdm
import random
import shutil
from PIL import Image, ImageDraw
from sklearn.model_selection import train_test_split
from IPython.display import clear_output, display
import cv2
import colorsys
from numpy import asarray
from tensorflow.keras.layers import Layer, Conv2D, BatchNormalization, MaxPool2D
import yaml
import math
#from kyolov5.config import params
np.random.seed(1919)
tf.random.set_seed(1949)
tf.__version__
Out[1]:
'2.8.0'
Global Parameters ¶
In [2]:
params={}
params['log_dir']='kyolov5/mylogs'
params['train_annotations_dir']='data/Road_Sign_Dataset/labels/train'
params['test_annotations_dir']='data/Road_Sign_Dataset/labels/test'
params['val_annotations_dir']='data/Road_Sign_Dataset/labels/val'
params['class_name_dir']='data/Road_Sign_Dataset/labels/Road_Sign_Dataset.names'
params['yaml_dir']='kyolov5/myyolo-m-mish.yaml'
params['checkpoint_dir']='kyolov5/weights'
params['saved_model_dir']='kyolov5/myyolov5'
params['n_epochs']=100
params['batch_size']=32
params['multi_gpus']=False
params['init_learning_rate']=3e-4
params['warmup_learning_rate']=1e-6
params['warmup_epochs']=2
params['img_size']=416
params['mosaic_data']=False
params['augment_data']=True
params['anchor_assign_method']='wh'
params['anchor_positive_augment']=False
params['label_smoothing']=0.04
In [3]:
DATASET_DIR='data/Road_Sign_Dataset'
In [4]:
log_writer = tf.summary.create_file_writer(params['log_dir'])
global_step = tf.Variable(0, trainable=False, dtype=tf.int64)
In [5]:
# Get the annotations
train_annotations = [os.path.join(params['train_annotations_dir'], x) for x in os.listdir(params['train_annotations_dir']) if x[-3:] == "txt"]
print("found :", len(train_annotations), " example :", train_annotations[0])
if 'train' in params['train_annotations_dir']:
print("OK")
tt=train_annotations[0].replace("labels","images").replace("txt","png")
print(tt)
found : 701 example : data/Road_Sign_Dataset/labels/train/road260.txt OK data/Road_Sign_Dataset/images/train/road260.png
In [6]:
class_name_to_id_mapping = {"trafficlight": 0,"stop": 1,"speedlimit": 2,"crosswalk": 3}
class_id_to_name_mapping = dict(zip(class_name_to_id_mapping.values(), class_name_to_id_mapping.keys()))
train_labels = [os.path.join(params['train_annotations_dir'], x) for x in os.listdir(params['train_annotations_dir']) if x[-3:] == "txt"]
val_labels = [os.path.join(params['val_annotations_dir'], x) for x in os.listdir(params['val_annotations_dir']) if x[-3:] == "txt"]
test_labels = [os.path.join(params['test_annotations_dir'], x) for x in os.listdir(params['test_annotations_dir']) if x[-3:] == "txt"]
print("found :", len(train_labels), " for train example :", train_labels[0])
print("found :", len(val_labels), " for val example :", val_labels[0])
print("found :", len(test_labels), " for train example :", test_labels[0])
found : 701 for train example : data/Road_Sign_Dataset/labels/train/road260.txt found : 88 for val example : data/Road_Sign_Dataset/labels/val/road269.txt found : 88 for train example : data/Road_Sign_Dataset/labels/test/road360.txt
Images utils ¶
In [7]:
def xyxy2xywh(box):
y0 = (box[..., 0: 1] + box[..., 2: 3]) / 2. # x center
y1 = (box[...,1: 2] + box[..., 3: 4]) / 2. # y center
y2 = box[..., 2: 3] - box[..., 0: 1] # width
y3 = box[..., 3: 4] - box[..., 1: 2] # height
y = tf.concat([y0, y1, y2, y3], axis=-1) if isinstance(box, tf.Tensor) \
else np.concatenate([y0, y1, y2, y3], axis=-1)
return y
def xywh2xyxy(box):
y0 = box[..., 0: 1] - box[..., 2: 3] / 2 # top left x
y1 = box[..., 1: 2] - box[..., 3: 4] / 2 # top left y
y2 = box[..., 0: 1] + box[..., 2: 3] / 2 # bottom right x
y3 = box[..., 1: 2] + box[..., 3: 4] / 2 # bottom right y
y = tf.concat([y0, y1, y2, y3], axis=-1) if isinstance(box, tf.Tensor) else np.concatenate([y0, y1, y2, y3], axis=-1)
return y
def box_iou(box1, box2, broadcast=True):
# input: xywh, n * 4, m * 4
# output: n * m
if broadcast:
box1 = tf.expand_dims(box1, 1) # n * 1 * 4
box2 = tf.expand_dims(box2, 0) # 1 * m * 4
boxes1_area = box1[..., 2] * box1[..., 3]
boxes2_area = box2[..., 2] * box2[..., 3]
box1 = tf.concat([box1[..., :2] - box1[..., 2:] * 0.5,
box1[..., :2] + box1[..., 2:] * 0.5], axis=-1) # xmin, ymin, xmax, ymax
box2 = tf.concat([box2[..., :2] - box2[..., 2:] * 0.5,
box2[..., :2] + box2[..., 2:] * 0.5], axis=-1)
left_up = tf.maximum(box1[..., :2], box2[..., :2])
right_down = tf.minimum(box1[..., 2:], box2[..., 2:])
inter_section = tf.maximum(right_down - left_up, 1e-6)
inter_area = inter_section[..., 0] * inter_section[..., 1]
union_area = boxes1_area + boxes2_area - inter_area + 1e-9
iou = 1.0 * inter_area / union_area
return iou
def load_mosaic_image_jl(index, mosaic_border, image_target_size, images_dir, labels):
#print(" load mosaique")
# labels style: pixel or norm
# labels output: pixel
max_index = len(labels) - 1
indices = [index] + [random.randint(0, max_index) for _ in range(3)]
yc, xc = [int(random.uniform(-i, 2 * image_target_size + i)) for i in mosaic_border] # mosaic center x, y
label_mosaic = []
for i, index in enumerate(indices):
img_dir = images_dir[index]
img = cv2.imread(img_dir)
label = labels[index].copy()
h_origin, w_origin, _ = img.shape
img = resize_image_jl(img, target_sizes=image_target_size, keep_ratio=False)
h, w, _ = img.shape
if i == 0: # top left
img_mosaic = np.full((image_target_size * 2, image_target_size * 2, 3), 128,
dtype=np.uint8) # base image with 4 tiles
x1a, y1a, x2a, y2a = max(xc - w, 0), max(yc - h, 0), xc, yc
x1b, y1b, x2b, y2b = w - (x2a - x1a), h - (y2a - y1a), w, h
elif i == 1: # top right
x1a, y1a, x2a, y2a = xc, max(yc - h, 0), min(xc + w, image_target_size * 2), yc
x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h
elif i == 2: # bottom left
x1a, y1a, x2a, y2a = max(xc - w, 0), yc, xc, min(image_target_size * 2, yc + h)
x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, max(xc, w), min(y2a - y1a, h)
elif i == 3: # bottom right
x1a, y1a, x2a, y2a = xc, yc, min(xc + w, image_target_size * 2), min(image_target_size * 2, yc + h)
x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h)
img_mosaic[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b]
padw = x1a - x1b
padh = y1a - y1b
label_new = label.copy()
if label.size > 0:
if np.max(label_new[:, 0:4]) > 1: # if label is pixel, [0, size]
label_new[:, [0, 2]] = label_new[:, [0, 2]]/ w_origin * w + padw
label_new[:, [1, 3]] = label_new[:, [1, 3]]/ h_origin * h + padh
else: # if label is normed, [0, 1]
label_new[:, [0, 2]] = label_new[:, [0, 2]] * w + padw
label_new[:, [1, 3]] = label_new[:, [1, 3]] * h + padh
label_mosaic.append(label_new)
if len(label_mosaic):
label_mosaic = np.concatenate(label_mosaic, 0)
label_mosaic[:, :4] = np.clip(label_mosaic[:, :4], 0, 2 * image_target_size)
img_mosaic, label_mosaic = random_perspective(img_mosaic, label=label_mosaic, border=mosaic_border)
return img_mosaic, label_mosaic
def random_perspective_jl(img, label=(), degrees=10, translate=.1, scale=.1, shear=10, perspective=0.0, border=(0, 0)):
# labels style: pixel, [xyxy, cls]
#print(" mosaique " ,img.dtype, "label =",label)
img = img.astype(np.uint8)
height = img.shape[0] + border[0] * 2 # shape(h,w,c)
width = img.shape[1] + border[1] * 2
height,width,_=img.shape
# Center
C = np.eye(3)
C[0, 2] = -img.shape[1] / 2 # x translation (pixels)
C[1, 2] = -img.shape[0] / 2 # y translation (pixels)
# Perspective
P = np.eye(3)
P[2, 0] = random.uniform(-perspective, perspective) # x perspective (about y)
P[2, 1] = random.uniform(-perspective, perspective) # y perspective (about x)
# Rotation and Scale
R = np.eye(3)
a = random.uniform(-degrees, degrees)
# a += random.choice([-180, -90, 0, 90]) # add 90deg rotations to small rotations
s = random.uniform(1 - scale, 1 + scale)
# s = 2 ** random.uniform(-scale, scale)
R[:2] = cv2.getRotationMatrix2D(angle=a, center=(0, 0), scale=s)
# Shear
S = np.eye(3)
S[0, 1] = math.tan(random.uniform(-shear, shear) * math.pi / 180) # x shear (deg)
S[1, 0] = math.tan(random.uniform(-shear, shear) * math.pi / 180) # y shear (deg)
# Translation
T = np.eye(3)
T[0, 2] = random.uniform(0.5 - translate, 0.5 + translate) * width # x translation (pixels)
T[1, 2] = random.uniform(0.5 - translate, 0.5 + translate) * height # y translation (pixels)
# Combined rotation matrix
M = T @ S @ R @ P @ C # order of operations (right to left) is IMPORTANT
if (border[0] != 0) or (border[1] != 0) or (M != np.eye(3)).any(): # image changed
if perspective:
img = cv2.warpPerspective(img, M, dsize=(width, height), borderValue=(114, 114, 114))
else: # affine
img = cv2.warpAffine(img, M[:2], dsize=(width, height), borderValue=(114, 114, 114))
# Transform label coordinates
n = len(label)
if n:
for j in label:
j[0:4]=xywh2xyxy(j[0:4])
if np.max(label[:, 0:4]) <= 1.0: # transfer to pixel level
#print("transfer to pixel level in modaique" ,label, img.shape[1],img.shape[0])
label[:, [0, 2]] = label[:, [0, 2]] * img.shape[1]
label[:, [1, 3]] = label[:, [1, 3]] * img.shape[0]
#print(" after ", label)
assert np.max(label[:, 0:4]) > 1, "don't use norm box coordinates here"
# warp points
xy = np.ones((n * 4, 3))
xy[:, :2] = label[:, [0, 1, 2, 3, 0, 3, 2, 1]].reshape(n * 4, 2) # x1y1, x2y2, x1y2, x2y1
xy = (xy @ M.T)[:, :2].reshape(n, 8)
# create new boxes
x = xy[:, [0, 2, 4, 6]]
y = xy[:, [1, 3, 5, 7]]
xy = np.concatenate((x.min(1), y.min(1), x.max(1), y.max(1))).reshape(4, n).T
# reject warped points outside of image
xy[:, [0, 2]] = xy[:, [0, 2]].clip(0, width)
xy[:, [1, 3]] = xy[:, [1, 3]].clip(0, height)
w = xy[:, 2] - xy[:, 0]
h = xy[:, 3] - xy[:, 1]
area = w * h
area0 = (label[:, 2] - label[:, 0]) * (label[:, 3] - label[:, 1])
ar = np.maximum(w / (h + 1e-16), h / (w + 1e-16)) # aspect ratio
i = (w > 2) & (h > 2) & (area / (area0 * scale + 1e-16) > 0.2) & (ar < 20)
label = label[i]
label[:, 0:4] = xy[i]
if label.size == 0: # in case, all labels is out
label = np.array([[0, 0, 0, 0, 0]], np.float32)
for jj in label:
jj[0:4]=xyxy2xywh(jj[0:4])
return img, label
def augment_hsv_jl(img, hgain=0.5, sgain=0.5, vgain=0.5):
#print("aygment $$$$$",img.dtype)
rand = np.random.uniform(-1, 1, 3) * [hgain, sgain, vgain] + 1
hue, sat, val = cv2.split(cv2.cvtColor(img, cv2.COLOR_BGR2HSV))
dtype = img.dtype
x = np.arange(0, 256, dtype=np.int16)
lut_hue = ((x * rand[0]) % 180).astype(dtype)
lut_sat = np.clip(x * rand[1], 0, 255).astype(dtype)
lut_val = np.clip(x * rand[2], 0, 255).astype(dtype)
img_hsv = cv2.merge((cv2.LUT(hue, lut_hue), cv2.LUT(sat, lut_sat), cv2.LUT(val, lut_val))).astype(dtype)
return cv2.cvtColor(img_hsv, cv2.COLOR_HSV2BGR)
def random_flip_jl(img, labels=None):
#print("flip ==",type(img),img.dtype, "labels ==",labels)
# Please note the labels should be normalized into [0, 1]
# assert np.max(labels) <= 1, "The flip labels should be normalized [0, 1]"
if np.max(labels[:, 0:4]) > 1: # transfer to pixel level
#print(" # transfer to pixel level")
labels[:, [0, 2]] = labels[:, [0, 2]] / img.shape[1]
labels[:, [1, 3]] = labels[:, [1, 3]] / img.shape[0]
#labels=xywh2xyxy(labels)
#for k in labels:
#k[1:5]=xywh2xyxy(k[1:5])
lr_flip = True
if lr_flip and random.random() < 0.5:
img = np.fliplr(img)
if labels is not None:
labels[:, [0]] = 1 - labels[:, [0]]
'''ud_flip = False
if ud_flip and random.random() < 0.5:
img = np.flipud(img)
print("--------- ud")
if labels is not None:
labels[:, [1, 2]] = 1 - labels[:, [1, 2]]'''
#labels=xyxy2xywh(labels)
#for mm in labels:
#mm[1:5]=xywh2xyxy(mm[1:5])
return img, labels
In [8]:
def load_image_jl(idx):
img_dir=idx.replace("labels","images").replace("txt","png")
#print(img_dir)
img = cv2.imread(img_dir)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
#img = Image.open(img_dir)
return img
def load_labels_jl(idx):
#label = np.array(idx)
label = np.loadtxt(idx)
if len(label.shape)==1:
label=label[np.newaxis,:]
label=label[:,[1,2,3,4,0]]
return label
def load_image_and_labels_jl(idx):
img=load_image_jl(idx)
label=load_labels_jl(idx)
return img,label
def plot_bounding_box(im, annotations):
#annotations = np.array(annotation_list)
#print("annotations==", annotations)
image=im.copy()
h,w,c = image.shape
print(w,h)
transformed_annotations = np.copy(annotations)
transformed_annotations[:,[0,2]] = annotations[:,[0,2]] * w
transformed_annotations[:,[1,3]] = annotations[:,[1,3]] * h
transformed_annotations[:,0] = transformed_annotations[:,0] - (transformed_annotations[:,2] / 2)
transformed_annotations[:,1] = transformed_annotations[:,1] - (transformed_annotations[:,3] / 2)
transformed_annotations[:,2] = transformed_annotations[:,0] + transformed_annotations[:,2]
transformed_annotations[:,3] = transformed_annotations[:,1] + transformed_annotations[:,3]
# Blue color in BGR
color = (255, 0, 0)
# Line thickness of 2 px
thickness = 1
# font
font = cv2.FONT_HERSHEY_SIMPLEX
# fontScale
fontScale = 0.3
for ann in transformed_annotations:
x0, y0, x1, y1 , obj_cls= ann.astype(int)
#print("!!!!!!!!", x0, y0, x1, y1 , obj_cls)
#image.rectangle(((x0,y0), (x1,y1)))
image=cv2.rectangle(image,(x0,y0), (x1,y1),color,thickness)
image=cv2.putText(image, class_id_to_name_mapping[(int(obj_cls))]+' '+str(int(obj_cls)) , (x0, y0 - 10), font, fontScale, color, 1, cv2.LINE_AA)
#plotted_image.text((x0, y0 - 10), class_id_to_name_mapping[(int(obj_cls))]+' '+str(int(obj_cls)) )
plt.figure(figsize=(10,10))
plt.imshow(np.array(image))
plt.show()
In [9]:
def resize_image_jl(img, target_sizes, keep_ratio=True, label=None):
# Please Note: label style should be normalized xyxy, otherwise need modify
# if keep_ratio is True, letterbox using padding
if not isinstance(target_sizes, (list, set, tuple)):
target_sizes = [target_sizes, target_sizes]
target_h, target_w = target_sizes
h, w, _ = img.shape
scale_h= target_h / h
scale_w = target_w / w
scale = min(scale_h,scale_w )
temp_h, temp_w = int(scale * h), int(scale * w)
image_resize = cv2.resize(img, (temp_w, temp_h))
if keep_ratio:
image_new = np.full(shape=(target_h, target_w, 3), fill_value=0.5)
delta_h, delta_w = (target_h - temp_h) // 2, (target_w - temp_w) // 2
image_new[delta_h: delta_h + temp_h, delta_w: delta_w + temp_w, :] = image_resize
if delta_w !=0:
xn=delta_w/target_w
else:
xn=0
if delta_h != 0 :
yn=delta_h/target_h
else:
yn=0
#print("scale=",scale_h,scale_w,delta_h,delta_w, "llllllllll",xn,yn)
if label is not None:
#label[:, [0,1,2,3]] = xywh2xyxy(label[:, [0,1,2,3]])
label[:, [0]] = (label[:, [0]] * scale * w + delta_w) / target_w
label[:, [1]] = (label[:, [1]] * scale * h + delta_h) / target_h
#label[:, [2]] = (label[:, [2]] * scale * w + delta_w) / target_w
#label[:, [3]] = (label[:, [3]] * scale * h + delta_h) / target_h
if delta_w!=0:
label[:, [2]] = (label[:, [2]] * (1-xn))
if delta_h!=0:
label[:, [3]] = (label[:, [3]] * (1-yn) )
a=1
return image_new, label
else:
return image_new
else:
if label is not None:
# it's fine if the label is normalized and the image is cv2.resize directly
return image_resize, label
else:
return image_resize
def resize_back(bboxes, target_sizes, original_shape):
original_h, original_w = original_shape[:2]
resize_ratio = min(target_sizes / original_w, target_sizes / original_h)
dw = (target_sizes - resize_ratio * original_w) / 2
dh = (target_sizes - resize_ratio * original_h) / 2
bboxes[:, [0, 2]] = 1.0 * (bboxes[:, [0, 2]] - dw) / resize_ratio
bboxes[:, [1, 3]] = 1.0 * (bboxes[:, [1, 3]] - dh) / resize_ratio
return bboxes
In [10]:
def transforms_jl(img, label, mosaic, augment):
# it's also easy to use albumentations here
if augment:
if not mosaic:
img, label = random_perspective_jl(img, label)
img = augment_hsv_jl(img)
if augment: # flip the data if it helps
img, label = random_flip_jl(img, label)
img = img / 255. # normalize the image
if np.max(label[:, 0:4]) > 1: # normalize the bbox
print("Normalizing bbox +++++++++++++++++++")
label[:, [0, 2]] = label[:, [0, 2]] / img.shape[1]
label[:, [1, 3]] = label[:, [1, 3]] / img.shape[0]
## ==> !! label[:, [0, 4]]=xywh2xyxy(label[:, [0, 4]])
return img, label
Let's test on some images ¶
In [11]:
random_file = random.choice(train_labels)
#random_file='data/Road_Sign_Dataset/labels/train/road821.txt'
print(random_file)
img=load_image_jl(random_file)
w,h,c=img.shape
print(img.size, img.shape,w,h,c)
print(img[0][0])
labels=load_labels_jl(random_file)
print("lables ===", labels)
print("img and label")
img,label=load_image_and_labels_jl(random_file)
print("kkkkkkkk",label)
print("lables before=", labels)
i,l= transforms_jl(img, labels, True, False)
print("l=",l)
plt.figure(figsize=(10,10))
plt.imshow(i)
plt.title('Res') #Give this plot a title,
#so I know it's from matplotlib and not cv2
plt.show()
plot_bounding_box(i,l)
img, label = resize_image_jl(i, 416, keep_ratio=True, label=l)
plot_bounding_box(img, label)
'''
plot_bounding_box(img,labels)
im=resize_image(im,416)
plt.figure(figsize=(10,10))
plt.imshow(im)
plt.title('Res') #Give this plot a title,
#so I know it's from matplotlib and not cv2
plt.show()
'''
data/Road_Sign_Dataset/labels/train/road121.txt 361200 (301, 400, 3) 301 400 3 [ 62 102 153] lables === [[0.67 0.457 0.555 0.748 2. ]] img and label kkkkkkkk [[0.67 0.457 0.555 0.748 2. ]] lables before= [[0.67 0.457 0.555 0.748 2. ]] l= [[0.67 0.457 0.555 0.748 2. ]]
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
400 301
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
416 416
Out[11]:
"\nplot_bounding_box(img,labels)\nim=resize_image(im,416)\nplt.figure(figsize=(10,10))\nplt.imshow(im)\nplt.title('Res') #Give this plot a title, \n #so I know it's from matplotlib and not cv2\nplt.show()\n"
Code for drawing resultas after inference ¶
In [12]:
def draw_box(image, label, classes_map=None):
# label: xyxy
box = label[:, 0:4].copy()
classes = label[:, -1]
if np.max(box) <= 1:
box[:, [0, 2]] = box[:, [0, 2]] * image.shape[1]
box[:, [1, 3]] = box[:, [1, 3]] * image.shape[0]
if not isinstance(box, int):
box = box.astype(np.int16)
image_h, image_w, _ = image.shape
num_classes = len(classes_map) if classes_map is not None else len(range(int(np.max(classes)) + 1))
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
bbox_thick = int(0.6 * (image_h + image_w) / 600)
font_scale = 0.5
for i in range(label.shape[0]):
x1y1 = tuple(box[i, 0:2])
x2y2 = tuple(box[i, 2:4])
class_ind = int(classes[i])
bbox_color = colors[class_ind]
image = cv2.rectangle(image, x1y1, x2y2, bbox_color, bbox_thick)
# show labels
if classes_map is not None:
class_ind = classes_map[class_ind]
else:
class_ind = str(class_ind)
if label.shape[-1] == 6:
score = ': ' + str(round(label[i, -2], 2))
else:
score = ''
bbox_text = '%s %s' % (class_ind, score)
t_size = cv2.getTextSize(bbox_text, 0, font_scale, thickness=bbox_thick//2)[0]
cv2.rectangle(image, x1y1, (x1y1[0] + t_size[0], x1y1[1] - t_size[1] - 3), bbox_color, -1) # filled
cv2.putText(image, bbox_text, (x1y1[0], x1y1[1]-2), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), bbox_thick//2, lineType=cv2.LINE_AA)
return image
Build The Model ¶
Loss Function ¶
In [13]:
class MyYoloLoss(object):
def __init__(self, anchors, ignore_iou_threshold, num_classes, img_size, label_smoothing=0):
self.anchors = anchors
self.strides = [8, 16, 32]
self.ignore_iou_threshold = ignore_iou_threshold
self.num_classes = num_classes
self.img_size = img_size
self.bce_conf = tf.keras.losses.BinaryCrossentropy(reduction=tf.keras.losses.Reduction.NONE)
self.bce_class = tf.keras.losses.BinaryCrossentropy(reduction=tf.keras.losses.Reduction.NONE,
label_smoothing=label_smoothing)
def __call__(self, y_true, y_pred):
iou_loss_all = obj_loss_all = class_loss_all = 0
balance = [1.0, 1.0, 1.0] if len(y_pred) == 3 else [4.0, 1.0, 0.4, 0.1] # P3-5 or P3-6
for i, (pred, true) in enumerate(zip(y_pred, y_true)):
# preprocess, true: batch_size * grid * grid * 3 * 6, pred: batch_size * grid * grid * clss+5
true_box, true_obj, true_class = tf.split(true, (4, 1, -1), axis=-1)
pred_box, pred_obj, pred_class = tf.split(pred, (4, 1, -1), axis=-1)
if tf.shape(true_class)[-1] == 1 and self.num_classes > 1:
true_class = tf.squeeze(tf.one_hot(tf.cast(true_class, tf.dtypes.int32), depth=self.num_classes, axis=-1), -2)
# prepare: higher weights to smaller box, true_wh should be normalized to (0,1)
box_scale = 2 - 1.0 * true_box[..., 2] * true_box[..., 3] / (self.img_size ** 2)
obj_mask = tf.squeeze(true_obj, -1) # obj or noobj, batch_size * grid * grid * anchors_per_grid
background_mask = 1.0 - obj_mask
conf_focal = tf.squeeze(tf.math.pow(true_obj - pred_obj, 2), -1)
# iou/ giou/ ciou/ diou loss
iou = bbox_iou(pred_box, true_box, xyxy=False, giou=True)
iou_loss = (1 - iou) * obj_mask * box_scale # batch_size * grid * grid * 3
# confidence loss, Todo: multiply the iou
conf_loss = self.bce_conf(true_obj, pred_obj)
conf_loss = conf_focal * (obj_mask * conf_loss + background_mask * conf_loss) # batch * grid * grid * 3
# class loss
# use binary cross entropy loss for multi class, so every value is independent and sigmoid
# please note that the output of tf.keras.losses.bce is original dim minus the last one
class_loss = obj_mask * self.bce_class(true_class, pred_class)
iou_loss = tf.reduce_mean(tf.reduce_sum(iou_loss, axis=[1, 2, 3]))
conf_loss = tf.reduce_mean(tf.reduce_sum(conf_loss, axis=[1, 2, 3]))
class_loss = tf.reduce_mean(tf.reduce_sum(class_loss, axis=[1, 2, 3]))
iou_loss_all += iou_loss * balance[i]
obj_loss_all += conf_loss * balance[i]
class_loss_all += class_loss * self.num_classes * balance[i] # to balance the 3 loss
try:
print('-'*55, 'iou', tf.reduce_sum(iou_loss_all).numpy(), ', conf', tf.reduce_sum(obj_loss_all).numpy(),
', class', tf.reduce_sum(class_loss_all).numpy())
except: # tf graph mode
pass
return (iou_loss_all, obj_loss_all, class_loss_all)
def bbox_iou(bbox1, bbox2, xyxy=False, giou=False, diou=False, ciou=False, epsilon=1e-9):
assert bbox1.shape == bbox2.shape
# giou loss: https://arxiv.org/abs/1902.09630
if xyxy:
b1x1, b1y1, b1x2, b1y2 = bbox1[..., 0], bbox1[..., 1], bbox1[..., 2], bbox1[..., 3]
b2x1, b2y1, b2x2, b2y2 = bbox2[..., 0], bbox2[..., 1], bbox2[..., 2], bbox2[..., 3]
else: # xywh -> xyxy
b1x1, b1x2 = bbox1[..., 0] - bbox1[..., 2] / 2, bbox1[..., 0] + bbox1[..., 2] / 2
b1y1, b1y2 = bbox1[..., 1] - bbox1[..., 3] / 2, bbox1[..., 1] + bbox1[..., 3] / 2
b2x1, b2x2 = bbox2[..., 0] - bbox2[..., 2] / 2, bbox2[..., 0] + bbox2[..., 2] / 2
b2y1, b2y2 = bbox2[..., 1] - bbox2[..., 3] / 2, bbox2[..., 1] + bbox2[..., 3] / 2
# intersection area
inter = tf.maximum(tf.minimum(b1x2, b2x2) - tf.maximum(b1x1, b2x1), 0) * \
tf.maximum(tf.minimum(b1y2, b2y2) - tf.maximum(b1y1, b2y1), 0)
# union area
w1, h1 = b1x2 - b1x1 + epsilon, b1y2 - b1y1 + epsilon
w2, h2 = b2x2 - b2x1+ epsilon, b2y2 - b2y1 + epsilon
union = w1 * h1 + w2 * h2 - inter + epsilon
# iou
iou = inter / union
if giou or diou or ciou:
# enclosing box
cw = tf.maximum(b1x2, b2x2) - tf.minimum(b1x1, b2x1)
ch = tf.maximum(b1y2, b2y2) - tf.minimum(b1y1, b2y1)
if giou:
enclose_area = cw * ch + epsilon
giou = iou - 1.0 * (enclose_area - union) / enclose_area
return tf.clip_by_value(giou, -1, 1)
if diou or ciou:
c2 = cw ** 2 + ch ** 2 + epsilon
rho2 = ((b2x1 + b2x2) - (b1x1 + b1x2)) ** 2 / 4 + ((b2y1 + b2y2) - (b1y1 + b1y2)) ** 2 / 4
if diou:
return iou - rho2 / c2
elif ciou:
v = (4 / math.pi ** 2) * tf.pow(tf.atan(w2 / h2) - tf.atan(w1 / h1), 2)
alpha = v / (1 - iou + v)
return iou - (rho2 / c2 + v * alpha)
return tf.clip_by_value(iou, 0, 1)
Metrics ¶
In [14]:
def ap_per_class(tp, conf, pred_cls, target_cls, plot=False, save_dir='precision-recall_curve.png', names=[]):
""" Compute the average precision, given the recall and precision curves.
Source: https://github.com/rafaelpadilla/Object-Detection-Metrics.
# Arguments
tp: True positives (nparray, nx1 or nx10).
conf: Objectness value from 0-1 (nparray).
pred_cls: Predicted object classes (nparray).
target_cls: True object classes (nparray).
plot: Plot precision-recall curve at mAP@0.5
save_dir: Plot save directory
# Returns
The average precision as computed in py-faster-rcnn.
"""
# Sort by objectness
i = np.argsort(-conf)
tp, conf, pred_cls = tp[i], conf[i], pred_cls[i]
# Find unique classes
unique_classes = np.unique(target_cls)
# Create Precision-Recall curve and compute AP for each class
px, py = np.linspace(0, 1, 1000), [] # for plotting
pr_score = 0.1 # score to evaluate P and R https://github.com/ultralytics/yolov3/issues/898
s = [unique_classes.shape[0], tp.shape[1]] # number class, number iou thresholds (i.e. 10 for mAP0.5...0.95)
ap, p, r = np.zeros(s), np.zeros(s), np.zeros(s)
for ci, c in enumerate(unique_classes):
i = pred_cls == c
n_l = (target_cls == c).sum() # number of labels
n_p = i.sum() # number of predictions
if n_p == 0 or n_l == 0:
continue
else:
# Accumulate FPs and TPs
fpc = (1 - tp[i]).cumsum(0)
tpc = tp[i].cumsum(0)
# Recall
recall = tpc / (n_l + 1e-16) # recall curve
r[ci] = np.interp(-pr_score, -conf[i], recall[:, 0]) # r at pr_score, negative x, xp because xp decreases
# Precision
precision = tpc / (tpc + fpc) # precision curve
p[ci] = np.interp(-pr_score, -conf[i], precision[:, 0]) # p at pr_score
# AP from recall-precision curve
for j in range(tp.shape[1]):
ap[ci, j], mpre, mrec = compute_ap(recall[:, j], precision[:, j])
if plot and (j == 0):
py.append(np.interp(px, mrec, mpre)) # precision at mAP@0.5
# Compute F1 score (harmonic mean of precision and recall)
f1 = 2 * p * r / (p + r + 1e-16)
return p, r, ap, f1, unique_classes.astype('int32')
def compute_ap(recall, precision):
""" Compute the average precision, given the recall and precision curves
# Arguments
recall: The recall curve (list)
precision: The precision curve (list)
# Returns
Average precision, precision curve, recall curve
"""
# Append sentinel values to beginning and end
mrec = np.concatenate(([0.], recall, [recall[-1] + 0.01]))
mpre = np.concatenate(([1.], precision, [0.]))
# Compute the precision envelope
mpre = np.flip(np.maximum.accumulate(np.flip(mpre)))
# Integrate area under curve
method = 'interp' # methods: 'continuous', 'interp'
if method == 'interp':
x = np.linspace(0, 1, 101) # 101-point interp (COCO)
ap = np.trapz(np.interp(x, mrec, mpre), x) # integrate
else: # 'continuous'
i = np.where(mrec[1:] != mrec[:-1])[0] # points where x axis (recall) changes
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) # area under curve
return ap, mpre, mrec
Module ¶
In [15]:
# === > from tensorflow.keras.layers import Layer, Conv2D, BatchNormalization, MaxPool2D
# from tensorflow.keras.layers import DepthwiseConv2D
# from tensorflow.keras.layers.experimental import SyncBatchNormalization
class Mish(object):
def __call__(self, x):
return x * tf.math.tanh(tf.math.softplus(x))
class Swish(object):
def __call__(self, x):
return tf.nn.swish(x) # tf.nn.leaky_relu(x, alpha=0.1)
class Conv(Layer):
def __init__(self, filters, kernel_size, strides, padding='SAME', groups=1):
super(Conv, self).__init__()
self.conv = Conv2D(filters, kernel_size, strides, padding, groups=groups, use_bias=False,
kernel_initializer=tf.random_normal_initializer(stddev=0.01),
kernel_regularizer=tf.keras.regularizers.L2(5e-4))
self.bn = BatchNormalization()
self.activation = Mish()
def call(self, x):
return self.activation(self.bn(self.conv(x)))
class DWConv(Layer):
def __init__(self, filters, kernel_size, strides):
super(DWConv, self).__init__()
self.conv = Conv(filters, kernel_size, strides, groups=1) # Todo
def call(self, x):
return self.conv(x)
class Focus(Layer):
def __init__(self, filters, kernel_size, strides=1, padding='SAME'):
super(Focus, self).__init__()
self.conv = Conv(filters, kernel_size, strides, padding)
def call(self, x):
return self.conv(tf.concat([x[..., ::2, ::2, :],
x[..., 1::2, ::2, :],
x[..., ::2, 1::2, :],
x[..., 1::2, 1::2, :]],
axis=-1))
class CrossConv(Layer):
def __init__(self, filters, kernel_size, strides=1, groups=1, expansion=1, shortcut=False):
super(CrossConv, self).__init__()
units_e = int(filters * expansion)
self.conv1 = Conv(units_e, (1, kernel_size), (1, strides))
self.conv2 = Conv(filters, (kernel_size, 1), (strides, 1), groups=groups)
self.shortcut = shortcut
def call(self, x):
if self.shortcut:
return x + self.conv2(self.conv1(x))
return self.conv2(self.conv1(x))
class MP(Layer):
# Spatial pyramid pooling layer
def __init__(self, k=2):
super(MP, self).__init__()
self.m = MaxPool2D(pool_size=k, strides=k)
def forward(self, x):
return self.m(x)
class Bottleneck(Layer):
def __init__(self, units, shortcut=True, expansion=0.5):
super(Bottleneck, self).__init__()
self.conv1 = Conv(int(units * expansion), 1, 1)
self.conv2 = Conv(units, 3, 1)
self.shortcut = shortcut
def call(self, x):
if self.shortcut:
return x + self.conv2(self.conv1(x))
return self.conv2(self.conv1(x))
class BottleneckCSP(Layer):
def __init__(self, units, n_layer=1, shortcut=True, expansion=0.5):
super(BottleneckCSP, self).__init__()
units_e = int(units * expansion)
self.conv1 = Conv(units_e, 1, 1)
self.conv2 = Conv2D(units_e, 1, 1, use_bias=False, kernel_initializer=tf.random_normal_initializer(stddev=0.01))
self.conv3 = Conv2D(units_e, 1, 1, use_bias=False, kernel_initializer=tf.random_normal_initializer(stddev=0.01))
self.conv4 = Conv(units, 1, 1)
self.bn = BatchNormalization(momentum=0.03)
self.activation = Mish()
self.modules = tf.keras.Sequential([Bottleneck(units_e, shortcut, expansion=1.0) for _ in range(n_layer)])
def call(self, x):
y1 = self.conv3(self.modules(self.conv1(x)))
y2 = self.conv2(x)
return self.conv4(self.activation(self.bn(tf.concat([y1, y2], axis=-1))))
class BottleneckCSP2(Layer):
def __init__(self, units, n_layer=1, shortcut=False, expansion=0.5):
super(BottleneckCSP2, self).__init__()
units_e = int(units) # hidden channels
self.conv1 = Conv(units_e, 1, 1)
self.conv2 = Conv2D(units_e, 1, 1, use_bias=False, kernel_initializer=tf.random_normal_initializer(stddev=0.01))
self.conv3 = Conv(units, 1, 1)
self.bn = BatchNormalization()
self.activation = Mish()
self.modules = tf.keras.Sequential([Bottleneck(units_e, shortcut, expansion=1.0) for _ in range(n_layer)])
def call(self, x):
x1 = self.conv1(x)
y1 = self.modules(x1)
y2 = self.conv2(x1)
return self.conv3(self.activation(self.bn(tf.concat([y1, y2], axis=-1))))
class VoVCSP(Layer):
def __init__(self, units, expansion=0.5):
super(VoVCSP, self).__init__()
units_e = int(units * expansion)
self.conv1 = Conv(units_e // 2, 3, 1)
self.conv2 = Conv(units_e // 2, 3, 1)
self.conv3 = Conv(units_e, 1, 1)
def call(self, x):
_, x1 = tf.split(x, 2, axis=1)
x1 = self.conv1(x1)
x2 = self.conv2(x1)
return self.conv3(tf.concat([x1, x2], axis=-1))
class SPP(Layer):
def __init__(self, units, kernels=(5, 9, 13)):
super(SPP, self).__init__()
units_e = units // 2 # Todo:
self.conv1 = Conv(units_e, 1, 1)
self.conv2 = Conv(units, 1, 1)
self.modules = [MaxPool2D(pool_size=x, strides=1, padding='SAME') for x in kernels] # Todo: padding check
def call(self, x):
x = self.conv1(x)
return self.conv2(tf.concat([x] + [module(x) for module in self.modules], axis=-1))
class SPPCSP(Layer):
# Cross Stage Partial Networks
def __init__(self, units, n=1, shortcut=False, expansion=0.5, kernels=(5, 9, 13)):
super(SPPCSP, self).__init__()
units_e = int(2 * units * expansion)
self.conv1 = Conv(units_e, 1, 1)
self.conv2 = Conv2D(units_e, 1, 1, use_bias=False, kernel_initializer=tf.random_normal_initializer(stddev=0.01))
self.conv3 = Conv(units_e, 3, 1)
self.conv4 = Conv(units_e, 1, 1)
self.modules = [MaxPool2D(pool_size=x, strides=1, padding='same') for x in kernels]
self.conv5 = Conv(units_e, 1, 1)
self.conv6 = Conv(units_e, 3, 1)
self.bn = BatchNormalization()
self.act = Mish()
self.conv7 = Conv(units, 1, 1)
def call(self, x):
x1 = self.conv4(self.conv3(self.conv1(x)))
y1 = self.conv6(self.conv5(tf.concat([x1] + [module(x1) for module in self.modules], axis=-1)))
y2 = self.conv2(x)
return self.conv7(self.act(self.bn(tf.concat([y1, y2], axis=-1))))
class Upsample(Layer):
def __init__(self, i=None, ratio=2, method='bilinear'):
super(Upsample, self).__init__()
self.ratio = ratio
self.method = method
def call(self, x):
return tf.image.resize(x, (tf.shape(x)[1] * self.ratio, tf.shape(x)[2] * self.ratio), method=self.method)
class Concat(Layer):
def __init__(self, dims=-1):
super(Concat, self).__init__()
self.dims = dims
def call(self, x):
return tf.concat(x, self.dims)
Optimizer ¶
In [16]:
class Optimizer(object):
def __init__(self, optimizer_method='adam'):
self.optimizer_method = optimizer_method
def __call__(self):
if self.optimizer_method == 'adam':
return tf.keras.optimizers.Adam()
elif self.optimizer_method == 'rmsprop':
return tf.keras.optimizers.RMSprop()
elif self.optimizer_method == 'sgd':
return tf.keras.optimizers.SGD()
else:
raise ValueError('Unsupported optimizer {}'.format(self.optimizer_method))
class LrScheduler(object):
def __init__(self, total_steps, params, scheduler_method='cosine'):
if scheduler_method == 'step':
self.scheduler = Step(total_steps, params)
elif scheduler_method == 'cosine':
self.scheduler = Cosine(total_steps, params)
self.step_count = 0
self.total_steps = total_steps
def step(self):
self.step_count += 1
lr = self.scheduler(self.step_count)
return lr
def plot(self):
lr = []
for i in range(self.total_steps):
lr.append(self.step())
plt.plot(range(self.total_steps), lr)
plt.show()
class Step(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, total_steps, params):
# create the step learning rate with linear warmup
super(Step, self).__init__()
self.total_steps = total_steps
self.params = params
def __call__(self, global_step):
warmup_lr = self.params['warmup_learning_rate']
warmup_steps = self.params['warmup_steps']
init_lr = self.params['init_learning_rate']
lr_levels = self.params['learning_rate_levels']
lr_steps = self.params['learning_rate_steps']
assert warmup_steps < self.total_steps, "warmup {}, total {}".format(warmup_steps, self.total_steps)
linear_warmup = warmup_lr + tf.cast(global_step, tf.float32) / warmup_steps * (init_lr - warmup_lr)
learning_rate = tf.where(global_step < warmup_steps, linear_warmup, init_lr)
for next_learning_rate, start_step in zip(lr_levels, lr_steps):
learning_rate = tf.where(global_step >= start_step, next_learning_rate, learning_rate)
return learning_rate
class Cosine(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, total_steps, params):
# create the cosine learning rate with linear warmup
super(Cosine, self).__init__()
self.total_steps = total_steps
self.params = params
def __call__(self, global_step):
init_lr = self.params['init_learning_rate']
warmup_lr = self.params['warmup_learning_rate'] if 'warmup_learning_rate' in self.params else 0.0
warmup_steps = self.params['warmup_steps']
assert warmup_steps < self.total_steps, "warmup {}, total {}".format(warmup_steps, self.total_steps)
linear_warmup = warmup_lr + tf.cast(global_step, tf.float32) / warmup_steps * (init_lr - warmup_lr)
cosine_learning_rate = init_lr * (
tf.cos(np.pi * (global_step - warmup_steps) / (self.total_steps - warmup_steps)) + 1.0) / 2.0
learning_rate = tf.where(global_step < warmup_steps, linear_warmup, cosine_learning_rate)
return learning_rate
Post Process ¶
In [17]:
def batch_non_max_suppression(prediction, conf_threshold=0.5, iou_threshold=0.25, classes=None, agnostic=False,
labels=()):
"""Performs Non-Maximum Suppression (NMS) on inference results
prediction: batch_size * 3grid * (num_classes + 5)
Returns:
detections with shape: nx6 (x1, y1, x2, y2, conf, cls)
"""
num_classes = tf.shape(prediction)[-1] - 5
candidates = prediction[..., 4] > conf_threshold
output = [tf.zeros((0, 6))] * prediction.shape[0]
for i, pred in enumerate(prediction): # iter for image
pred = pred[candidates[i]] # filter by yolo confidence
if not pred.shape[0]:
continue
box = xywh2xyxy(pred[:, :4])
score = pred[:, 4]
classes = tf.argmax(pred[..., 5:], axis=-1)
pred_nms = []
for clss in tf.unique(classes)[0]:
mask = tf.math.equal(classes, clss)
box_of_clss = tf.boolean_mask(box, mask) # n_conf * 4
classes_of_clss = tf.boolean_mask(classes, mask) # n_conf
score_of_clss = tf.boolean_mask(score, mask) # n_conf
select_indices = tf.image.non_max_suppression(box_of_clss, score_of_clss, max_output_size=50,
iou_threshold=iou_threshold) # for one class
box_of_clss = tf.gather(box_of_clss, select_indices)
score_of_clss = tf.gather(tf.expand_dims(score_of_clss, -1), select_indices)
classes_of_clss = tf.cast(tf.gather(tf.expand_dims(classes_of_clss, -1), select_indices), tf.float32)
pred_of_clss = tf.concat([box_of_clss, score_of_clss, classes_of_clss], axis=-1)
pred_nms.append(pred_of_clss)
output[i] = tf.concat(pred_nms, axis=0)
return output
def weighted_boxes_fusion():
return
My Yolo V5 Model ¶
In [18]:
class MyYolo(object):
def __init__(self, yaml_dir):
with open(yaml_dir) as f:
yaml_dict = yaml.load(f, Loader=yaml.FullLoader)
self.module_list = self.parse_model(yaml_dict)
module = self.module_list[-1]
if isinstance(module, Detect):
# transfer the anchors to grid coordinator, 3 * 3 * 2
module.anchors /= tf.reshape(module.stride, [-1, 1, 1])
def __call__(self, img_size, name='yolo'):
x = tf.keras.Input([img_size, img_size, 3])
output = self.forward(x)
return tf.keras.Model(inputs=x, outputs=output, name=name)
def forward(self, x):
y = []
for module in self.module_list:
if module.f != -1: # if not from previous layer
if isinstance(module.f, int):
x = y[module.f]
else:
x = [x if j == -1 else y[j] for j in module.f]
x = module(x)
y.append(x)
return x
def parse_model(self, yaml_dict):
anchors, nc = yaml_dict['anchors'], yaml_dict['nc']
depth_multiple, width_multiple = yaml_dict['depth_multiple'], yaml_dict['width_multiple']
num_anchors = (len(anchors[0]) // 2) if isinstance(anchors, list) else anchors
output_dims = num_anchors * (nc + 5)
layers = []
# from, number, module, args
for i, (f, number, module, args) in enumerate(yaml_dict['backbone'] + yaml_dict['head']):
# all component is a Class, initialize here, call in self.forward
module = eval(module) if isinstance(module, str) else module
for j, arg in enumerate(args):
try:
args[j] = eval(arg) if isinstance(arg, str) else arg # eval strings, like Detect(nc, anchors)
except:
pass
number = max(round(number * depth_multiple), 1) if number > 1 else number # control the model scale
if module in [Conv2D, Conv, Bottleneck, SPP, DWConv, Focus, BottleneckCSP, BottleneckCSP2, SPPCSP, VoVCSP]:
c2 = args[0]
c2 = math.ceil(c2 * width_multiple / 8) * 8 if c2 != output_dims else c2
args = [c2, *args[1:]]
if module in [BottleneckCSP, BottleneckCSP2, SPPCSP, VoVCSP]:
args.insert(1, number)
number = 1
modules = tf.keras.Sequential(*[module(*args) for _ in range(number)]) if number > 1 else module(*args)
modules.i, modules.f = i, f
layers.append(modules)
return layers
class Detect(Layer):
def __init__(self, num_classes, anchors=()):
super(Detect, self).__init__()
self.num_classes = num_classes
self.num_scale = len(anchors)
self.output_dims = self.num_classes + 5
self.num_anchors = len(anchors[0])//2
self.stride = np.array([8, 16, 32], np.float32) # fixed here, modify if structure changes
self.anchors = tf.cast(tf.reshape(anchors, [self.num_anchors, -1, 2]), tf.float32)
self.modules = [Conv2D(self.output_dims * self.num_anchors, 1, use_bias=False) for _ in range(self.num_scale)]
def call(self, x, training=True):
res = []
for i in range(self.num_scale): # number of scale layer, default=3
y = self.modules[i](x[i])
_, grid1, grid2, _ = y.shape
y = tf.reshape(y, (-1, grid1, grid2, self.num_scale, self.output_dims))
grid_xy = tf.meshgrid(tf.range(grid1), tf.range(grid2)) # grid[x][y]==(y,x)
grid_xy = tf.cast(tf.expand_dims(tf.stack(grid_xy, axis=-1), axis=2),tf.float32)
y_norm = tf.sigmoid(y) # sigmoid for all dims
xy, wh, conf, classes = tf.split(y_norm, (2, 2, 1, self.num_classes), axis=-1)
pred_xy = (xy * 2. - 0.5 + grid_xy) * self.stride[i] # decode pred to xywh
pred_wh = (wh * 2) ** 2 * self.anchors[i] * self.stride[i]
out = tf.concat([pred_xy, pred_wh, conf, classes], axis=-1)
res.append(out)
return res
Let's see the update Yaml File, nb classes = 4 and anchorss are generated with script below
In [58]:
!cat kyolov5/myyolo-m-mish.yaml
# parameters nc: 4 # 20 for voc number of classes depth_multiple: 0.67 # model depth multiple width_multiple: 0.75 # layer channel multiple # anchors anchors: - [16,13, 12,26, 32,26] # P3/8 - [32,68, 55,42, 86,66] # P4/16 - [65,123, 128,110, 223,277] # P5/32 # YOLOv5 backbone backbone: # [from, number, module, args] [[-1, 1, Focus, [64, 3]], # 0-P1/2 [-1, 1, Conv, [128, 3, 2]], # 1-P2/4 [-1, 3, BottleneckCSP, [128]], [-1, 1, Conv, [256, 3, 2]], # 3-P3/8 [-1, 9, BottleneckCSP, [256]], [-1, 1, Conv, [512, 3, 2]], # 5-P4/16 [-1, 9, BottleneckCSP, [512]], [-1, 1, Conv, [1024, 3, 2]], # 7-P5/32 [-1, 1, SPP, [1024, [5, 9, 13]]], [-1, 3, BottleneckCSP, [1024, False]], # 9 ] # YOLOv5 head head: [[-1, 1, Conv, [512, 1, 1]], [-1, 1, Upsample, [None, 2, 'nearest']], [[-1, 6], 1, Concat, [-1]], # cat backbone P4 [-1, 3, BottleneckCSP, [512, False]], # 13 [-1, 1, Conv, [256, 1, 1]], [-1, 1, Upsample, [None, 2, 'nearest']], [[-1, 4], 1, Concat, [-1]], # cat backbone P3 [-1, 3, BottleneckCSP, [256, False]], # 17 (P3/8-small) [-1, 1, Conv, [256, 3, 2]], [[-1, 14], 1, Concat, [-1]], # cat head P4 [-1, 3, BottleneckCSP, [512, False]], # 20 (P4/16-medium) [-1, 1, Conv, [512, 3, 2]], [[-1, 10], 1, Concat, [-1]], # cat head P5 [-1, 3, BottleneckCSP, [1024, False]], # 23 (P5/32-large) [[17, 20, 23], 1, Detect, [nc, anchors]], # Detect(P3, P4, P5) ]
Build Model and Generate y_true with Anchors
In [19]:
mymodel = MyYolo(yaml_dir=params['yaml_dir'])
anchors = mymodel.module_list[-1].anchors
stride = mymodel.module_list[-1].stride
num_classes = mymodel.module_list[-1].num_classes
print("anchors =",anchors,"stride=",stride,"classes=",num_classes)
anchors = tf.Tensor( [[[2. 1.625 ] [1.5 3.25 ] [4. 3.25 ]] [[2. 4.25 ] [3.4375 2.625 ] [5.375 4.125 ]] [[2.03125 3.84375] [4. 3.4375 ] [6.96875 8.65625]]], shape=(3, 3, 2), dtype=float32) stride= [ 8. 16. 32.] classes= 4
In [20]:
'''loss_fn = MyYoloLoss(mymodel.module_list[-1].anchors,
ignore_iou_threshold=0.3,
num_classes=num_classes,
label_smoothing=params['label_smoothing'],
img_size=params['img_size'])
optimizer = Optimizer('adam')() '''
Out[20]:
"loss_fn = MyYoloLoss(mymodel.module_list[-1].anchors,\n ignore_iou_threshold=0.3,\n num_classes=num_classes,\n label_smoothing=params['label_smoothing'],\n img_size=params['img_size'])\noptimizer = Optimizer('adam')() "
In [21]:
'''model = mymodel(params['img_size'])'''
Out[21]:
"model = mymodel(params['img_size'])"
In [22]:
mymodel.module_list[-1].anchors
Out[22]:
<tf.Tensor: shape=(3, 3, 2), dtype=float32, numpy= array([[[2. , 1.625 ], [1.5 , 3.25 ], [4. , 3.25 ]], [[2. , 4.25 ], [3.4375 , 2.625 ], [5.375 , 4.125 ]], [[2.03125, 3.84375], [4. , 3.4375 ], [6.96875, 8.65625]]], dtype=float32)>
In [54]:
class DataReader_jl(object):
'''
read the image and label from the text information (generated by dataset/prepare_data.py)
resize the image, and adjust the label rect if necessary
augment the dataset (augment function is defined in dataset/augment_data.py)
'''
def __init__(self, annotations, img_size=416, transforms=None, mosaic=False, augment=False, filter_idx=None, test=False):
self.annotations = annotations
self.idx = range(len(self.annotations))
self.img_size = img_size # image_target_size
self.transforms = transforms
self.mosaic = mosaic
self.augment = augment
self.test = test
self.images_dir = []
self.labels_ori = [] # original labels
if filter_idx is not None: # filter some samples
self.idx = [i for i in self.idx if i in filter_idx]
print('filter {} from {}'.format(len(self.idx), len(self.annotations)))
for i in self.idx:
image_dir, label =self.parse_annotations(self.annotations[i])
self.images_dir.append(image_dir)
self.labels_ori.append(label)
def __len__(self):
return len(self.annotations)
def __getitem__(self, idx):
#print("idx ======",idx)
if self.test:
img = load_image_jl(self.annotations[idx])
img = resize_image(img, self.img_size, keep_ratio=True)
return img
if self.mosaic: # mosaic need to load 4 images
#print("************** mosaique")
mosaic_border = [-self.img_size // 2, -self.img_size // 2]
img, label = load_mosaic_image_jl(idx, mosaic_border, self.img_size, self.images_dir, self.labels_ori)
else:
#print("h----------------")
#print(idx)
img, label = load_image_and_labels_jl(idx)
#print(label)
#print("===============")
#print("============ label",label)
if self.transforms:
img, label = transforms_jl(img, label, mosaic=self.mosaic, augment=self.augment)
img, label = resize_image_jl(img, self.img_size, keep_ratio=True, label=label) # resize the image
return img, label
def iter(self):
for i in self.annotations:
yield self[i]
def parse_annotations(self, annotation):
#example = annotation.split()
img_dir = annotation
tt=img_dir.replace("labels","images").replace("txt","png")
# ==> label = np.array([list(map(float, box.split(',')[0: 5])) for box in example[1:]])
# image_dir/001.jpg x_min, y_min, x_max, y_max, class_id x_min2, y_min2, x_max2, y_max2, class_id2
label = np.loadtxt(annotation)
if len(label.shape)==1:
label=label[np.newaxis,:]
label=label[:,[1,2,3,4,0]] # class from first to last position
# assert label.shape[1] == 5, "Label have and only have 5 dims: xmin, ymin, xmax, ymax, class"
# assert np.max(label[:, 0:4]) <= 1, "Label box should be (0, 1), {}".format(annotation)
return tt, label
Data Loader ¶
In [55]:
class DataLoader_jl(object):
'''
data pipeline from data_reader (image,label) to tf.data
'''
def __init__(self, data_reader, anchors, stride, img_size=416, anchor_assign_method='wh',
anchor_positive_augment=True):
self.data_reader = data_reader
self.anchor_label_js = AnchorLabeler_jl(anchors,
grids=img_size / stride,
img_size=img_size,
assign_method=anchor_assign_method,
extend_offset=anchor_positive_augment
)
'''anchor_match_threshold=0.02'''
self.img_size = img_size
def __call__(self, batch_size=8, anchor_label=True):
dataset = tf.data.Dataset.from_generator(self.data_reader.iter,
output_types=(tf.float32, tf.float32),
output_shapes=([self.img_size, self.img_size, 3], [None, 5]))
if anchor_label: # when train
dataset = dataset.map(self.transform_js, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
return dataset
def transform_js(self, image, label):
label_encoder = self.anchor_label_js.encode_js(label)
return image, label_encoder
Get the y_truth for a label ¶
In [56]:
class AnchorLabeler_jl(object):
# transfer the annotated label to model target by anchor encoding, to calculate anchor based loss next step
def __init__(self, anchors, grids, img_size=416, assign_method='wh', extend_offset=True, rect_style='rect4', anchor_match_threshold=4.0): # 4.0 or 0.3
self.anchors = anchors # from yaml.anchors to Detect.anchors, w/h based on grid coordinators
self.grids = grids
self.img_size = img_size
self.assign_method = assign_method
self.extend_offset = extend_offset
self.rect_style = rect_style
self.anchor_match_threshold = anchor_match_threshold
def encode_js(self, labels):
''' This is important for Yolo series.
key part is: assign the label to which anchor and which grid, new encoding method of V4 solved the grid sensitivity problem
labels: (n_bs * n_gt * 5), x/y/w/h/class, normalized image coordinators
anchors: (3 * 3 * 2), scale * anchor_per_scale * wh,
return: [[], [], []]
'''
self.num_scales = self.anchors.shape[0]
self.n_anchor_per_scale = self.anchors.shape[1]
y_anchor_encode = []
gain = tf.ones(5, tf.float32)
for i in range(self.num_scales):
anchor = self.anchors[i]
grid_size = tf.cast(self.grids[i], tf.int32)
y_true = tf.zeros([grid_size, grid_size, self.n_anchor_per_scale, 6], tf.float32)
gain = tf.tensor_scatter_nd_update(gain, [[0], [1], [2], [3]], [grid_size] * 4)
scaled_labels = labels * gain # label coordinator now is the same with anchors
if labels is not None:
gt_wh = scaled_labels[..., 2:4] # n_gt * 2
if self.assign_method == 'wh':
assert self.anchor_match_threshold > 1, 'threshold is totally different for wh and iou assign'
matched_matrix = self.assign_criterion_wh(gt_wh, anchor, self.anchor_match_threshold)
elif self.assign_method == 'iou':
assert self.anchor_match_threshold < 1, 'threshold is totally different for wh and iou assign'
matched_matrix = self.assign_criterion_iou(gt_wh, anchor, self.anchor_match_threshold)
else:
raise ValueError
n_gt = tf.shape(gt_wh)[0]
assigned_anchor = tf.tile(tf.reshape(tf.range(self.n_anchor_per_scale), (self.n_anchor_per_scale, 1)),
(1, n_gt))
assigned_anchor = tf.expand_dims(assigned_anchor[matched_matrix], 1) # filter
assigned_anchor = tf.cast(assigned_anchor, tf.int32)
assigned_label = tf.tile(tf.expand_dims(scaled_labels, 0), [self.n_anchor_per_scale, 1, 1])
assigned_label = assigned_label[matched_matrix]
if self.extend_offset:
assigned_label, assigned_anchor, grid_offset = self.enrich_pos_by_position(
assigned_label, assigned_anchor, gain, matched_matrix)
else:
grid_offset = tf.zeros_like(assigned_label[:, 0:2])
assigned_grid = tf.cast(assigned_label[..., 0:2] - grid_offset, tf.int32) # n_matched * 2
assigned_grid = tf.clip_by_value(assigned_grid, clip_value_min=0, clip_value_max=grid_size-1)
# tensor: grid * grid * 3 * 6, indices(sparse index): ~n_gt * gr * gr * 3, updates: ~n_gt * 6
assigned_indices = tf.concat([assigned_grid[:, 1:2], assigned_grid[:, 0:1], assigned_anchor],
axis=1)
xy, wh, clss = tf.split(assigned_label, (2, 2, 1), axis=-1)
xy = xy / gain[0] * self.img_size
wh = wh / gain[1] * self.img_size
obj = tf.ones_like(clss)
assigned_updates = tf.concat([xy, wh, obj, clss], axis=-1)
y_true = tf.tensor_scatter_nd_update(y_true, assigned_indices, assigned_updates)
y_anchor_encode.append(y_true)
return tuple(y_anchor_encode) # add a tuple is important here, otherwise raise an error
def assign_criterion_wh(self, gt_wh, anchors, anchor_threshold):
# return: please note that the v5 default anchor_threshold is 4.0, related to the positive sample augment
gt_wh = tf.expand_dims(gt_wh, 0) # => 1 * n_gt * 2
anchors = tf.expand_dims(anchors, 1) # => n_anchor * 1 * 2
ratio = gt_wh / anchors # => n_anchor * n_gt * 2
matched_matrix = tf.reduce_max(tf.math.maximum(ratio, 1 / ratio),
axis=2) < anchor_threshold # => n_anchor * n_gt
return matched_matrix
def assign_criterion_iou(self, gt_wh, anchors, anchor_threshold):
# by IOU, anchor_threshold < 1
box_wh = tf.expand_dims(gt_wh, 0) # => 1 * n_gt * 2
box_area = box_wh[..., 0] * box_wh[..., 1] # => 1 * n_gt
anchors = tf.cast(anchors, tf.float32) # => n_anchor * 2
anchors = tf.expand_dims(anchors, 1) # => n_anchor * 1 * 2
anchors_area = anchors[..., 0] * anchors[..., 1] # => n_anchor * 1
inter = tf.math.minimum(anchors[..., 0], box_wh[..., 0]) * tf.math.minimum(anchors[..., 1],
box_wh[..., 1]) # n_gt * n_anchor
iou = inter / (anchors_area + box_area - inter + 1e-9)
iou = iou > anchor_threshold
return iou
def enrich_pos_by_position(self, assigned_label, assigned_anchor, gain, matched_matrix, rect_style='rect4'):
# using offset to extend more postive result, if x
assigned_xy = assigned_label[..., 0:2] # n_matched * 2
offset = tf.constant([[0, 0], [1, 0], [0, 1], [-1, 0], [0, -1]], tf.float32)
grid_offset = tf.zeros_like(assigned_xy)
if rect_style == 'rect2':
g = 0.2 # offset
elif rect_style == 'rect4':
g = 0.5 # offset
matched = (assigned_xy % 1. < g) & (assigned_xy > 1.)
matched_left = matched[:, 0]
matched_up = matched[:, 1]
matched = (assigned_xy % 1. > (1 - g)) & (assigned_xy < tf.expand_dims(gain[0:2], 0) - 1.)
matched_right = matched[:, 0]
matched_down = matched[:, 1]
assigned_anchor = tf.concat([assigned_anchor, assigned_anchor[matched_left], assigned_anchor[matched_up],
assigned_anchor[matched_right], assigned_anchor[matched_down]], axis=0)
assigned_label = tf.concat([assigned_label, assigned_label[matched_left], assigned_label[matched_up],
assigned_label[matched_right], assigned_label[matched_down]], axis=0)
grid_offset = g * tf.concat(
[grid_offset, grid_offset[matched_left] + offset[1], grid_offset[matched_up] + offset[2],
grid_offset[matched_right] + offset[3], grid_offset[matched_down] + offset[4]], axis=0)
return assigned_label, assigned_anchor, grid_offset
Trainer / Fit ¶
In [26]:
class Trainer(object):
""" Trainer class that uses the dataset and model to train
# Usage
data_loader = tf.data.Dataset()
trainer = Trainer(params)
trainer.train(data_loader)
"""
global themodel
def __init__(self, params,mymodel, transfer='scratch'):
""" Constructor
:param params: dict, with dir and training parameters
"""
self.params = params
#if os.path.exists(self.params['log_dir']):
#shutil.rmtree(self.params['log_dir'])
self.log_writer = tf.summary.create_file_writer(self.params['log_dir'])
self.global_step = tf.Variable(0, trainable=False, dtype=tf.int64)
self.transfer=transfer
self.build_model()
def build_model(self):
""" Build the model,
define the training strategy and model, loss, optimizer
:return:
"""
if self.params['multi_gpus']:
self.strategy = tf.distribute.MirroredStrategy(devices=None)
else:
self.strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
with self.strategy.scope():
self.model = MyYolo(yaml_dir=self.params['yaml_dir'])
self.anchors = self.model.module_list[-1].anchors
self.stride = self.model.module_list[-1].stride
self.num_classes = self.model.module_list[-1].num_classes
self.loss_fn = MyYoloLoss(self.model.module_list[-1].anchors,
ignore_iou_threshold=0.3,
num_classes=self.num_classes,
label_smoothing=self.params['label_smoothing'],
img_size=self.params['img_size'])
self.optimizer = Optimizer('adam')()
themodel=self.model
def train(self, train_dataset, valid_dataset=None):
""" train function
:param train_dataset: train dataset built by tf.data
:param valid_dataset: valid dataset build by td.data, optional
:param transfer: pretrain
:return:
"""
all_losses=[]
print("type",type(train_dataset.len),train_dataset.len, type(self.params['batch_size']),self.params['batch_size'])
steps_per_epoch = train_dataset.len / self.params['batch_size']
self.total_steps = int(self.params['n_epochs'] * steps_per_epoch)
self.params['warmup_steps'] = self.params['warmup_epochs'] * steps_per_epoch
with self.strategy.scope():
self.lr_scheduler = LrScheduler(self.total_steps, self.params, scheduler_method='cosine')
# => tf.keras.Model
self.model = self.model(self.params['img_size'])
#self.model=model
self.ckpt = tf.train.Checkpoint(model=self.model, optimizer=self.optimizer)
self.ckpt_manager = tf.train.CheckpointManager(self.ckpt, self.params['checkpoint_dir'], max_to_keep=5)
if self.transfer == 'darknet':
print("Load weights from ")
model_pretrain = MyYolo(self.params['yaml_dir'])()
model_pretrain.load_weights()
self.model.get_layer().set_weights()
elif self.transfer == 'resume':
print("Load weights from latest checkpoint")
self.ckpt.restore(self.ckpt_manager.latest_checkpoint)
elif self.transfer == 'scratch':
print("Train from scratch")
#print(self.model.summary())
train_dataset = self.strategy.experimental_distribute_dataset(train_dataset)
for epoch in range(1, self.params['n_epochs'] + 1):
for step, (image, target) in enumerate(train_dataset):
loss = self.dist_train_step(image, target)
all_losses.append(loss)
clear_output(wait=True)
print('=> Epoch {}, Step {}, Loss {:.5f}'.format(epoch, self.global_step.numpy(), loss.numpy()))
with self.log_writer.as_default():
tf.summary.scalar('loss', loss, step=self.global_step)
tf.summary.scalar('lr', self.optimizer.lr, step=self.global_step)
self.log_writer.flush()
if epoch % 3 == 0:
ckpt_save_path = self.ckpt_manager.save()
print('Saving checkpoint for epoch {} at {}'.format(epoch, ckpt_save_path))
self.export_model()
themodel=self.model
return self.model, all_losses
# @tf.function
def train_step(self, image, target):
with tf.GradientTape() as tape:
logit = self.model(image, training=True)
iou_loss, conf_loss, prob_loss = self.loss_fn(target, logit)
total_loss = iou_loss + conf_loss + prob_loss
gradients = tape.gradient(total_loss, self.model.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
lr = self.lr_scheduler.step()
self.optimizer.lr.assign(lr)
self.global_step.assign_add(1)
return total_loss
@tf.function
def dist_train_step(self, image, target):
with self.strategy.scope():
loss = self.strategy.run(self.train_step, args=(image, target))
total_loss_mean = self.strategy.reduce(tf.distribute.ReduceOp.MEAN, loss, axis=None)
return total_loss_mean
def validate(self, valid_dataset):
valid_loss = []
for step, (image, target) in enumerate(valid_dataset):
step_valid_loss = self.valid_step(image, target)
valid_loss.append(step_valid_loss)
return np.mean(valid_loss)
def valid_step(self, image, label):
logit = self.model(image, training=False)
iou_loss, conf_loss, prob_loss = self.loss_fn(label, logit)
return iou_loss + conf_loss + prob_loss
def export_model(self):
tf.saved_model.save(self.model, self.params['saved_model_dir'])
print("pb model saved in {}".format(self.params['saved_model_dir']))
In [27]:
trainer = Trainer(params,mymodel)
In [28]:
print(trainer.anchors,trainer.stride)
tf.Tensor( [[[2. 1.625 ] [1.5 3.25 ] [4. 3.25 ]] [[2. 4.25 ] [3.4375 2.625 ] [5.375 4.125 ]] [[2.03125 3.84375] [4. 3.4375 ] [6.96875 8.65625]]], shape=(3, 3, 2), dtype=float32) [ 8. 16. 32.]
In [29]:
trainReader = DataReader_jl(train_labels,
img_size=params['img_size'],
transforms=transforms_jl,
mosaic=params['mosaic_data'],
augment=params['augment_data'],
filter_idx=None)
In [30]:
trainLoader = DataLoader_jl(trainReader,
trainer.anchors,
trainer.stride,
params['img_size'],
params['anchor_assign_method'],
params['anchor_positive_augment'])
In [31]:
train_dataset = trainLoader(batch_size=32, anchor_label=True)
train_dataset.len = len(trainReader)
Val Dataset ¶
In [32]:
valReader = DataReader_jl(val_labels,
img_size=params['img_size'],
transforms=transforms_jl,
mosaic=params['mosaic_data'],
augment=params['augment_data'],
filter_idx=None)
In [33]:
valLoader = DataLoader_jl(valReader,
trainer.anchors,
trainer.stride,
params['img_size'],
params['anchor_assign_method'],
params['anchor_positive_augment'])
In [34]:
val_dataset = valLoader(batch_size=32, anchor_label=True)
val_dataset.len = len(valReader)
Check our Loaded data ¶
In [35]:
for i , (image, target) in enumerate(train_dataset):
print("i=",i, image.shape, target[2].shape)
if i==6:
break
print("bbox infer",len(target),target[0].shape)
i= 0 (32, 416, 416, 3) (32, 13, 13, 3, 6) i= 1 (32, 416, 416, 3) (32, 13, 13, 3, 6) i= 2 (32, 416, 416, 3) (32, 13, 13, 3, 6) i= 3 (32, 416, 416, 3) (32, 13, 13, 3, 6) i= 4 (32, 416, 416, 3) (32, 13, 13, 3, 6) i= 5 (32, 416, 416, 3) (32, 13, 13, 3, 6) i= 6 (32, 416, 416, 3) (32, 13, 13, 3, 6) bbox infer 3 (32, 52, 52, 3, 6)
In [36]:
target[1][0].shape
Out[36]:
TensorShape([26, 26, 3, 6])
In [37]:
plt.figure(figsize=(10,10))
plt.imshow(image[0])
plt.title('Res') #Give this plot a title,
#so I know it's from matplotlib and not cv2
plt.show()
pred=[]
for i in range(3):
print("i==",i)
true_box, true_class = tf.split(target[i], (5, -1), axis=-1)
print("true ",true_class.numpy().max())
true_class = tf.squeeze(tf.one_hot(tf.cast(true_class, tf.dtypes.int32), depth=4, axis=-1), -2)
print(true_class.shape,true_box.shape)
print("sqeez ",true_class.numpy().max())
tt=tf.dtypes.cast(true_class, tf.float32)
pred_bbox =tf.concat([true_box,tt], axis=-1)
pred.append(pred_bbox)
print(len(pred))
pred_bbox = [tf.reshape(x, (tf.shape(x)[0], -1, tf.shape(x)[-1])) for x in pred]
pred_bbox = tf.concat(pred_bbox, axis=1) # batch_size * -1 * (num_class + 5)
bboxes = batch_non_max_suppression(pred_bbox, conf_threshold=0.5, iou_threshold=0.5)
bboxes = bboxes[0].numpy() # batch is 1 for detect
#bboxes = resize_back(bboxes, target_sizes=img_size, original_shape=original_shape) # adjust box to original size
print("bboxex=",bboxes)
rres=draw_box(image[0].numpy(), bboxes, class_id_to_name_mapping)
plt.figure(figsize=(10,10))
plt.imshow(rres)
plt.title('Res')
plt.show()
i== 0 true 3.0 (32, 52, 52, 3, 4) (32, 52, 52, 3, 5) sqeez 1.0 i== 1 true 3.0 (32, 26, 26, 3, 4) (32, 26, 26, 3, 5) sqeez 1.0 i== 2 true 3.0 (32, 13, 13, 3, 4) (32, 13, 13, 3, 5) sqeez 1.0 3
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
bboxex= [[180.1769 187.068 233.22566 230.6388 1. 2. ]]
In [38]:
params['n_epochs']=400
trainer = Trainer(params,mymodel,transfer='resume')
mmymodel, all_losses=trainer.train(train_dataset,valid_dataset='val_dataset')
=> Epoch 10, Step 220, Loss 6.20235
2022-06-23 18:08:56.001356: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. WARNING:absl:Found untraced functions such as conv_150_layer_call_fn, conv_150_layer_call_and_return_conditional_losses, conv2d_189_layer_call_fn, conv2d_189_layer_call_and_return_conditional_losses, conv_152_layer_call_fn while saving (showing 5 of 322). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: kyolov5/myyolov5/assets
INFO:tensorflow:Assets written to: kyolov5/myyolov5/assets
pb model saved in kyolov5/myyolov5
In [53]:
#all_losses
#mmymodel.summary()
# Save the weights
#model.save_weights('./checkpoints/my_checkpoint')
# Create a new model instance
# model = create_model()
# Restore the weights
#model.load_weights('./checkpoints/my_checkpoint')
# Evaluate the model
#loss, acc = model.evaluate(test_images, test_labels, verbose=2)
#print("Restored model, accuracy: {:5.2f}%".format(100 * acc))
# Save the entire model as a SavedModel.
#!mkdir -p saved_model
#mmymodel.save('saved_model/my_model')
tf.saved_model.save(mmymodel,'saved_model/my_model')
WARNING:absl:Found untraced functions such as conv_150_layer_call_fn, conv_150_layer_call_and_return_conditional_losses, conv2d_189_layer_call_fn, conv2d_189_layer_call_and_return_conditional_losses, conv_152_layer_call_fn while saving (showing 5 of 322). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: saved_model/my_model/assets
INFO:tensorflow:Assets written to: saved_model/my_model/assets
In [40]:
plt.figure(figsize=(8, 8))
plt.plot(all_losses)
plt.title('Training and Validation Loss')
plt.show()
Check detection ¶
In [41]:
#model = tf.saved_model.load('kyolov5/weights/yolov5')
In [42]:
def draw_box_2(image, label, classes_map=None):
# label: xyxy
box = label[:, 0:4].copy()
print(box)
classes = label[:, -1]
print(classes)
if np.max(box) <= 1:
box[:, [0, 2]] = box[:, [0, 2]] * image.shape[1]
box[:, [1, 3]] = box[:, [1, 3]] * image.shape[0]
if not isinstance(box, int):
box = box.astype(np.int16)
image_h, image_w, _ = image.shape
num_classes = len(class_name_to_id_mapping) if class_name_to_id_mapping is not None else len(range(int(np.max(class_name_to_id_mapping)) + 1))
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
bbox_thick = int(0.6 * (image_h + image_w) / 600)
font_scale = 0.5
for i in range(label.shape[0]):
x1y1 = tuple(box[i, 0:2])
x2y2 = tuple(box[i, 2:4])
class_ind = int(classes[i])
bbox_color = colors[class_ind]
image = cv2.rectangle(image, x1y1, x2y2, bbox_color, bbox_thick)
# show labels
if classes_map is not None:
class_ind = classes_map[class_ind]
else:
class_ind = str(class_ind)
if label.shape[-1] == 6:
score = ': ' + str(round(label[i, -2], 2))
else:
score = ''
bbox_text = '%s %s' % (class_ind, score)
t_size = cv2.getTextSize(bbox_text, 0, font_scale, thickness=bbox_thick//2)[0]
cv2.rectangle(image, x1y1, (x1y1[0] + t_size[0], x1y1[1] - t_size[1] - 3), bbox_color, -1) # filled
cv2.putText(image, bbox_text, (x1y1[0], x1y1[1]-2), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), bbox_thick//2, lineType=cv2.LINE_AA)
plt.figure(figsize=(10,10))
plt.imshow(image)
plt.title('Res') #Give this plot a title,
#so I know it's from matplotlib and not cv2
plt.show()
return image
In [43]:
def image_demo(img, model, img_size=416, class_names=None, conf_threshold=0.4, iou_threshold=0.3):
original_shape = img.shape
img_input = resize_image_jl(img, target_sizes=img_size)
img_input = img_input[np.newaxis, ...].astype(np.float32)
img_input = img_input / 255.
pred_bbox = model(img_input)
pred_bbox = [tf.reshape(x, (tf.shape(x)[0], -1, tf.shape(x)[-1])) for x in pred_bbox]
pred_bbox = tf.concat(pred_bbox, axis=1) # batch_size * -1 * (num_class + 5)
bboxes = batch_non_max_suppression(pred_bbox, conf_threshold=conf_threshold, iou_threshold=iou_threshold)
bboxes = bboxes[0].numpy() # batch is 1 for detect
bboxes = resize_back(bboxes, target_sizes=img_size, original_shape=original_shape) # adjust box to original size
if bboxes.any():
image = draw_box_2(img, np.array(bboxes), class_names)
#cv2.imwrite('./demo.jpg', cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
else:
print('No box detected')
In [47]:
def test_image_demo(img_dir, model_dir, img_size=416, class_names=None, conf_threshold=0.4, iou_threshold=0.3):
img = cv2.imread(img_dir)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.figure(figsize=(10,10))
plt.imshow(img)
plt.title('Detection') #Give this plot a title, #so I know it's from matplotlib and not cv2
plt.show()
#model = tf.saved_model.load(model_dir)
image_demo(img, mmymodel, img_size=img_size, class_names=class_names, conf_threshold=conf_threshold, iou_threshold=iou_threshold)
In [48]:
test_image_demo('data/Road_Sign_Dataset/images/train/road117.png','kyolov5/weights/yolov5',class_names=class_id_to_name_mapping,conf_threshold=0.2, iou_threshold=0.2)
[[219.16751 34.420074 287.8386 104.80192 ]] [2.]
In [49]:
test_image_demo('data/Road_Sign_Dataset/images/test/road492.png','kyolov5/weights/yolov5',class_names=class_id_to_name_mapping,conf_threshold=0.2, iou_threshold=0.2)
[[106.371956 136.11127 185.77998 209.06606 ]] [2.]
In [50]:
test_image_demo('data/Road_Sign_Dataset/images/train/road876.png','kyolov5/weights/yolov5',class_names=class_id_to_name_mapping,conf_threshold=0.2, iou_threshold=0.2)
[[105.56413 111.94829 181.53271 181.94147] [ 82.76357 295.58542 97.6198 313.9564 ]] [2. 1.]
In [51]:
test_image_demo('data/Road_Sign_Dataset/images/train/road821.png','kyolov5/weights/yolov5',class_names=class_id_to_name_mapping,conf_threshold=0.2, iou_threshold=0.2)
[[107.12087 103.83834 157.95752 156.03476 ] [147.25017 173.05266 193.88777 223.48944 ] [216.8995 272.40665 233.21227 296.98642 ] [104.43091 183.59987 135.72337 261.1297 ] [ 81.776985 263.8541 109.88999 309.32343 ] [ 21.743738 211.65175 38.09965 246.56895 ] [-48.23551 -8.996755 4.556304 104.25527 ]] [1. 3. 3. 0. 0. 0. 0.]
In [ ]:
Generate Anchors Tool ¶
In [ ]:
class Anchor(object):
# create the default anchors by k-means
def __init__(self):
pass
def kmeans(self, boxes, k, dist=np.mean):
n_examples = boxes.shape[0]
distances = np.empty((n_examples, k))
last_clusters = np.zeros((n_examples,))
clusters = boxes[np.random.choice(n_examples, k, replace=False)]
while True:
for example in range(n_examples):
distances[example] = 1 - self.iou(boxes[example], clusters)
nearest_clusters = np.argmin(distances, axis=1)
if (last_clusters == nearest_clusters).all():
break
for cluster in range(k):
clusters[cluster] = dist(boxes[nearest_clusters == cluster], axis=0)
last_clusters = nearest_clusters
return clusters
def generate_anchor(self, labels, k=9):
annotations = self.prepare_annotations(labels)
clusters = self.kmeans(annotations, k=k)
avg_iou = self.get_avg_iou(annotations, clusters)
print('Average IOU', avg_iou)
anchors = clusters.astype('int').tolist()
anchors = sorted(anchors, key=lambda x: x[0] * x[1])
return anchors
def prepare_annotations(self,labels):
result = []
for i, idx in enumerate(labels):
label = np.loadtxt(idx)
if len(label.shape)==1:
label=label[np.newaxis,:]
#print(label)
assert label.shape[1] == 5, "make sure the labeled objective has xmin,ymin,xmax,ymax,class"
bbox_wh = label[:, 3:5]*416
result.append(bbox_wh)
result = np.concatenate(result, axis=0)
return result
def iou(self, box, clusters):
"""
Calculates the Intersection over Union (IoU) between a box and k clusters.
param:
box: tuple or array, shifted to the origin (i. e. width and height)
clusters: numpy array of shape (k, 2) where k is the number of clusters
return:
numpy array of shape (k, 0) where k is the number of clusters
"""
x = np.minimum(clusters[:, 0], box[0])
y = np.minimum(clusters[:, 1], box[1])
if np.count_nonzero(x == 0) > 0 or np.count_nonzero(y == 0) > 0:
raise ValueError("Box has no area")
intersection = x * y
box_area = box[0] * box[1]
cluster_area = clusters[:, 0] * clusters[:, 1]
iou_ = np.true_divide(intersection, box_area + cluster_area - intersection + 1e-7)
# iou_ = intersection / (box_area + cluster_area - intersection + 1e-7)
return iou_
def get_avg_iou(self, boxes, clusters):
return np.mean([np.max(self.iou(boxes[i], clusters)) for i in range(boxes.shape[0])])
In [ ]:
anchor = Anchor()
anchors = anchor.generate_anchor(train_labels, k=9)
print(anchors)