卫星视频稳像与重采本质上与高光谱的波段配准与对齐原理是一样的。主要包含特征点提取、匹配同名点、计算仿射模型(或平移模型)、影像重采四个步骤。 相比于高光谱波段对准,卫星视频稳像还简单一些。主要因为一是卫星视频帧间重叠范围较大,易提取同名点,二是视频影像中不存在高光谱中不同谱段同一地物显示不同的问题。 此外,卫星视频稳像和波段对准都有两大类方案,一类是固定帧,一类是迭代。 所谓固定帧即指指定某帧(波段)为基准,将其它帧(波段)都重采到基准帧(波段)范围。 而迭代法则是将上一次重采后的结果作为下一次重采的基准帧,重采下下帧,以此类推。 对于高光谱波段配准而言,采用迭代法的好处是更容易匹配到同名点。因为一般认为相邻波段间光谱变化相对较小,更容易找到相似地物。 对于视频稳像而言,采用迭代法的好处是可以完整地对整段视频进行重采。例如某视频第一帧和第50帧有重叠区域,但和第51帧及后续帧没有重叠区域。 如果采用固定帧法,只能重采到50帧,因为后续帧没有同名点,无法计算模型进行重采。 由于认为变化是连续缓慢的,因此相邻帧之间是一定会有重叠区域的,基于此采用迭代法在视频重采中会更加稳健。
1.代码
这里采用SURF作为特征点,采用FLANN作为同名点筛选条件,建立仿射模型进行影像重采。这里采用固定帧方法,并没有采用迭代法。
# coding=utf-8
import cv2
import numpy as np
import os
import sys
def drawMatches(img1, img2, good_matches):
if img1.shape.__len__() == 2:
img1 = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
if img2.shape.__len__() == 2:
img2 = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
img_out = np.zeros([max(img1.shape[0], img2.shape[0]), img1.shape[1] + img2.shape[1], 3], np.uint8)
img_out[:img1.shape[0], :img1.shape[1], :] = img1
img_out[:img2.shape[0], img2.shape[1]:, :] = img2
for match in good_matches:
pt1 = (int(match[0]), int(match[1]))
pt2 = (int(match[2] + img1.shape[1]), int(match[3]))
cv2.circle(img_out, pt1, 5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.circle(img_out, pt2, 5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.line(img_out, pt1, pt2, (0, 0, 255), 1, cv2.LINE_AA)
return img_out
def FLANN_SURF(img1, img2, threshold=2000):
good_matches = []
good_kps1 = []
good_kps2 = []
good_out = []
good_out_kp1 = []
good_out_kp2 = []
# 新建SIFT对象,参数默认
surf = cv2.xfeatures2d_SURF.create(hessianThreshold=threshold)
# 调用函数进行SIFT提取
kp1, des1 = cv2.xfeatures2d_SURF.detectAndCompute(surf, img1, None)
kp2, des2 = cv2.xfeatures2d_SURF.detectAndCompute(surf, img2, None)
if len(kp1) < 3 or len(kp2) < 3:
print("No enough keypoints, keypoint number is less than 3.")
img_out = np.zeros([max(img1.shape[0], img2.shape[0]), img1.shape[1] + img2.shape[1], 3], np.uint8)
img_out[:img1.shape[0], :img1.shape[1], :] = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
img_out[:img2.shape[0], img2.shape[1]:, :] = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
return good_out_kp1, good_out_kp2, good_out, img_out
else:
print("kp1 size:" + len(kp1).__str__() + "," + "kp2 size:" + len(kp2).__str__())
# FLANN parameters
FLANN_INDEX_KDTREE = 0
index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
search_params = dict(checks=50) # or pass empty dictionary
flann = cv2.FlannBasedMatcher(index_params, search_params)
matches = flann.knnMatch(des1, des2, k=2)
# 筛选
for i, (m, n) in enumerate(matches):
if m.distance < 0.5 * n.distance:
good_matches.append(matches[i])
good_kps1.append(kp1[matches[i][0].queryIdx])
good_kps2.append(kp2[matches[i][0].trainIdx])
if good_matches.__len__() == 0:
print("No enough good matches.")
img_show1 = np.zeros([img1.shape[0], img1.shape[1], 3], np.uint8)
img_show2 = np.zeros([img2.shape[0], img2.shape[1], 3], np.uint8)
cv2.drawKeypoints(img1, kp1, img_show1)
cv2.drawKeypoints(img2, kp2, img_show2)
img_out = np.zeros([max(img1.shape[0], img2.shape[0]), img1.shape[1] + img2.shape[1], 3], np.uint8)
img_out[:img1.shape[0], :img1.shape[1], :] = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
img_out[:img2.shape[0], img2.shape[1]:, :] = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
return good_out_kp1, good_out_kp2, good_out, img_out
else:
print("good matches:" + good_matches.__len__().__str__())
for i in range(good_kps1.__len__()):
good_out_kp1.append([good_kps1[i].pt[0], good_kps1[i].pt[1]])
good_out_kp2.append([good_kps2[i].pt[0], good_kps2[i].pt[1]])
good_out.append([good_kps1[i].pt[0], good_kps1[i].pt[1], good_kps2[i].pt[0], good_kps2[i].pt[1]])
img1_show = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
img2_show = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
img3 = drawMatches(img1_show, img2_show, good_out)
return good_out_kp1, good_out_kp2, good_out, img3
def findAllFiles(root_dir, filter):
print("Finding files ends with \'" + filter + "\' ...")
separator = os.path.sep
paths = []
names = []
files = []
# 遍历
for parent, dirname, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith(filter):
paths.append(parent + separator)
names.append(filename)
for i in range(paths.__len__()):
print(paths[i] + names[i])
files.append(paths[i] + names[i])
print (names.__len__().__str__() + " files have been found.")
paths.sort()
names.sort()
return paths, names, files
def resampleFrame(base_path, resample_path, name, out_path):
base_img_gray = cv2.imread(base_path, cv2.IMREAD_GRAYSCALE)
resample_img = cv2.imread(resample_path)
resample_img_gray = cv2.cvtColor(resample_img, cv2.COLOR_BGR2GRAY)
print("load " + name + " success.")
print("matching " + name + " with surf features.")
kps1, kps2, matches, img = FLANN_SURF(base_img_gray, resample_img_gray)
if kps1.__len__() < 3:
print("No enough match points in " + name)
out = np.zeros([base_img_gray.shape[0], base_img_gray.shape[1], 3], np.uint8)
cv2.imwrite(out_path, out)
return
else:
print("match " + name + " success.")
affine_matrix, mask = cv2.estimateAffine2D(np.array(kps2), np.array(kps1))
if affine_matrix is None:
print("build affine model for " + name + " failed.")
out = np.zeros([base_img_gray.shape[0], base_img_gray.shape[1], 3], np.uint8)
cv2.imwrite(out_path, out)
return
else:
print("build affine model for " + name + " success.")
print(affine_matrix)
print("resample b band in " + name)
resampled_img_b = cv2.warpAffine(resample_img[:, :, 0],
affine_matrix,
(base_img_gray.shape[1],
base_img_gray.shape[0]))
print("resample g band in " + name)
resampled_img_g = cv2.warpAffine(resample_img[:, :, 1],
affine_matrix,
(base_img_gray.shape[1],
base_img_gray.shape[0]))
print("resample r band in " + name)
resampled_img_r = cv2.warpAffine(resample_img[:, :, 2],
affine_matrix,
(base_img_gray.shape[1],
base_img_gray.shape[0]))
out = cv2.merge((resampled_img_b, resampled_img_g, resampled_img_r))
print("saving image...")
cv2.imwrite(out_path, out)
print("finish.")
if sys.argv.__len__() == 2 and sys.argv[1] == "help":
print("Resample video frames\n")
print("Command:")
print("Mode 1: scriptname.py [search_dir] [base_frame_path] [input_file_type] [output_file_type]")
print("Mode 2: scriptname.py [search_dir] [base_frame_path] [input_file_type] [output_file_type] [out_path]")
print("For example:")
print("Mode 1: python resample.py D:\\frames D:\\frames\\frame50\\frame50.tif _L1A.tif .tif")
print("Mode 2: python resample.py D:\\frames D:\\frames\\frame50\\frame50.tif _L1A.tif .tif D:\\out")
elif sys.argv.__len__() == 5:
path = sys.argv[1]
base_img_path = sys.argv[2]
file_type = sys.argv[3]
output_type = sys.argv[4]
if output_type[0] != '.':
output_type = '.' + output_type
out_path = []
paths, names, files = findAllFiles(path, file_type)
for item in files:
out_path.append(item.split('.')[0] + "_resample" + output_type)
if sys.version_info.major < 3:
flag = raw_input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
else:
flag = input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
if flag == 'y':
for i in range(files.__len__()):
if files[i] == base_img_path:
print("base image " + (i + 1).__str__() + "/" + files.__len__().__str__())
print("-" * 80)
continue
resampleFrame(base_img_path, files[i], names[i], out_path[i])
print("resampled " + (i + 1).__str__() + "/" + files.__len__().__str__())
print("-" * 80)
print("All frames were resampled.")
else:
exit()
elif sys.argv.__len__() == 6:
path = sys.argv[1]
base_img_path = sys.argv[2]
file_type = sys.argv[3]
output_type = sys.argv[4]
output_path = sys.argv[5]
if output_type[0] != '.':
output_type = '.' + output_type
out_path = []
paths, names, files = findAllFiles(path, file_type)
for item in names:
out_path.append(output_path + os.path.sep + item.split('.')[0] + "_resample" + output_type)
if sys.version_info.major < 3:
flag = raw_input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
else:
flag = input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
if flag == 'y':
for i in range(files.__len__()):
if files[i] == base_img_path:
print("base image " + (i + 1).__str__() + "/" + files.__len__().__str__())
print("-" * 80)
continue
resampleFrame(base_img_path, files[i], names[i], out_path[i])
print("resampled " + (i + 1).__str__() + "/" + files.__len__().__str__())
print("-" * 80)
print("All frames were resampled.")
else:
exit()
else:
print("Input \"scriptname.py help\" for help information.")
2.测试效果
如上图所示,分别是没有进行稳像重采的一段视频和处理过的一段视频。
本文作者原创,未经许可不得转载,谢谢配合