卫星视频稳像与重采代码

Jun 19,2018   7606 words   28 min


卫星视频稳像与重采本质上与高光谱的波段配准与对齐原理是一样的。主要包含特征点提取、匹配同名点、计算仿射模型(或平移模型)、影像重采四个步骤。 相比于高光谱波段对准,卫星视频稳像还简单一些。主要因为一是卫星视频帧间重叠范围较大,易提取同名点,二是视频影像中不存在高光谱中不同谱段同一地物显示不同的问题。 此外,卫星视频稳像和波段对准都有两大类方案,一类是固定帧,一类是迭代。 所谓固定帧即指指定某帧(波段)为基准,将其它帧(波段)都重采到基准帧(波段)范围。 而迭代法则是将上一次重采后的结果作为下一次重采的基准帧,重采下下帧,以此类推。 对于高光谱波段配准而言,采用迭代法的好处是更容易匹配到同名点。因为一般认为相邻波段间光谱变化相对较小,更容易找到相似地物。 对于视频稳像而言,采用迭代法的好处是可以完整地对整段视频进行重采。例如某视频第一帧和第50帧有重叠区域,但和第51帧及后续帧没有重叠区域。 如果采用固定帧法,只能重采到50帧,因为后续帧没有同名点,无法计算模型进行重采。 由于认为变化是连续缓慢的,因此相邻帧之间是一定会有重叠区域的,基于此采用迭代法在视频重采中会更加稳健。

1.代码

这里采用SURF作为特征点,采用FLANN作为同名点筛选条件,建立仿射模型进行影像重采。这里采用固定帧方法,并没有采用迭代法。

# coding=utf-8
import cv2
import numpy as np
import os
import sys


def drawMatches(img1, img2, good_matches):
    if img1.shape.__len__() == 2:
        img1 = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
    if img2.shape.__len__() == 2:
        img2 = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
    img_out = np.zeros([max(img1.shape[0], img2.shape[0]), img1.shape[1] + img2.shape[1], 3], np.uint8)
    img_out[:img1.shape[0], :img1.shape[1], :] = img1
    img_out[:img2.shape[0], img2.shape[1]:, :] = img2
    for match in good_matches:
        pt1 = (int(match[0]), int(match[1]))
        pt2 = (int(match[2] + img1.shape[1]), int(match[3]))
        cv2.circle(img_out, pt1, 5, (0, 0, 255), 1, cv2.LINE_AA)
        cv2.circle(img_out, pt2, 5, (0, 0, 255), 1, cv2.LINE_AA)
        cv2.line(img_out, pt1, pt2, (0, 0, 255), 1, cv2.LINE_AA)
    return img_out


def FLANN_SURF(img1, img2, threshold=2000):
    good_matches = []
    good_kps1 = []
    good_kps2 = []

    good_out = []
    good_out_kp1 = []
    good_out_kp2 = []

    # 新建SIFT对象,参数默认
    surf = cv2.xfeatures2d_SURF.create(hessianThreshold=threshold)
    # 调用函数进行SIFT提取
    kp1, des1 = cv2.xfeatures2d_SURF.detectAndCompute(surf, img1, None)
    kp2, des2 = cv2.xfeatures2d_SURF.detectAndCompute(surf, img2, None)

    if len(kp1) < 3 or len(kp2) < 3:
        print("No enough keypoints, keypoint number is less than 3.")
        img_out = np.zeros([max(img1.shape[0], img2.shape[0]), img1.shape[1] + img2.shape[1], 3], np.uint8)
        img_out[:img1.shape[0], :img1.shape[1], :] = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
        img_out[:img2.shape[0], img2.shape[1]:, :] = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
        return good_out_kp1, good_out_kp2, good_out, img_out
    else:
        print("kp1 size:" + len(kp1).__str__() + "," + "kp2 size:" + len(kp2).__str__())

    # FLANN parameters
    FLANN_INDEX_KDTREE = 0
    index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
    search_params = dict(checks=50)  # or pass empty dictionary

    flann = cv2.FlannBasedMatcher(index_params, search_params)
    matches = flann.knnMatch(des1, des2, k=2)

    # 筛选
    for i, (m, n) in enumerate(matches):
        if m.distance < 0.5 * n.distance:
            good_matches.append(matches[i])
            good_kps1.append(kp1[matches[i][0].queryIdx])
            good_kps2.append(kp2[matches[i][0].trainIdx])

    if good_matches.__len__() == 0:
        print("No enough good matches.")
        img_show1 = np.zeros([img1.shape[0], img1.shape[1], 3], np.uint8)
        img_show2 = np.zeros([img2.shape[0], img2.shape[1], 3], np.uint8)
        cv2.drawKeypoints(img1, kp1, img_show1)
        cv2.drawKeypoints(img2, kp2, img_show2)
        img_out = np.zeros([max(img1.shape[0], img2.shape[0]), img1.shape[1] + img2.shape[1], 3], np.uint8)
        img_out[:img1.shape[0], :img1.shape[1], :] = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
        img_out[:img2.shape[0], img2.shape[1]:, :] = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
        return good_out_kp1, good_out_kp2, good_out, img_out
    else:
        print("good matches:" + good_matches.__len__().__str__())
        for i in range(good_kps1.__len__()):
            good_out_kp1.append([good_kps1[i].pt[0], good_kps1[i].pt[1]])
            good_out_kp2.append([good_kps2[i].pt[0], good_kps2[i].pt[1]])
            good_out.append([good_kps1[i].pt[0], good_kps1[i].pt[1], good_kps2[i].pt[0], good_kps2[i].pt[1]])

    img1_show = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
    img2_show = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
    img3 = drawMatches(img1_show, img2_show, good_out)
    return good_out_kp1, good_out_kp2, good_out, img3


def findAllFiles(root_dir, filter):
    print("Finding files ends with \'" + filter + "\' ...")
    separator = os.path.sep
    paths = []
    names = []
    files = []
    # 遍历
    for parent, dirname, filenames in os.walk(root_dir):
        for filename in filenames:
            if filename.endswith(filter):
                paths.append(parent + separator)
                names.append(filename)
    for i in range(paths.__len__()):
        print(paths[i] + names[i])
        files.append(paths[i] + names[i])
    print (names.__len__().__str__() + " files have been found.")
    paths.sort()
    names.sort()
    return paths, names, files


def resampleFrame(base_path, resample_path, name, out_path):
    base_img_gray = cv2.imread(base_path, cv2.IMREAD_GRAYSCALE)
    resample_img = cv2.imread(resample_path)
    resample_img_gray = cv2.cvtColor(resample_img, cv2.COLOR_BGR2GRAY)
    print("load " + name + " success.")
    print("matching " + name + " with surf features.")
    kps1, kps2, matches, img = FLANN_SURF(base_img_gray, resample_img_gray)
    if kps1.__len__() < 3:
        print("No enough match points in " + name)
        out = np.zeros([base_img_gray.shape[0], base_img_gray.shape[1], 3], np.uint8)
        cv2.imwrite(out_path, out)
        return
    else:
        print("match " + name + " success.")
        affine_matrix, mask = cv2.estimateAffine2D(np.array(kps2), np.array(kps1))
        if affine_matrix is None:
            print("build affine model for " + name + " failed.")
            out = np.zeros([base_img_gray.shape[0], base_img_gray.shape[1], 3], np.uint8)
            cv2.imwrite(out_path, out)
            return
        else:
            print("build affine model for " + name + " success.")
            print(affine_matrix)
            print("resample b band in " + name)
            resampled_img_b = cv2.warpAffine(resample_img[:, :, 0],
                                             affine_matrix,
                                             (base_img_gray.shape[1],
                                              base_img_gray.shape[0]))
            print("resample g band in " + name)
            resampled_img_g = cv2.warpAffine(resample_img[:, :, 1],
                                             affine_matrix,
                                             (base_img_gray.shape[1],
                                              base_img_gray.shape[0]))
            print("resample r band in " + name)
            resampled_img_r = cv2.warpAffine(resample_img[:, :, 2],
                                             affine_matrix,
                                             (base_img_gray.shape[1],
                                              base_img_gray.shape[0]))
            out = cv2.merge((resampled_img_b, resampled_img_g, resampled_img_r))
            print("saving image...")
            cv2.imwrite(out_path, out)
            print("finish.")


if sys.argv.__len__() == 2 and sys.argv[1] == "help":
    print("Resample video frames\n")
    print("Command:")
    print("Mode 1: scriptname.py [search_dir] [base_frame_path] [input_file_type] [output_file_type]")
    print("Mode 2: scriptname.py [search_dir] [base_frame_path] [input_file_type] [output_file_type] [out_path]")
    print("For example:")
    print("Mode 1: python resample.py D:\\frames D:\\frames\\frame50\\frame50.tif _L1A.tif .tif")
    print("Mode 2: python resample.py D:\\frames D:\\frames\\frame50\\frame50.tif _L1A.tif .tif D:\\out")
elif sys.argv.__len__() == 5:
    path = sys.argv[1]
    base_img_path = sys.argv[2]
    file_type = sys.argv[3]
    output_type = sys.argv[4]

    if output_type[0] != '.':
        output_type = '.' + output_type
    out_path = []
    paths, names, files = findAllFiles(path, file_type)
    for item in files:
        out_path.append(item.split('.')[0] + "_resample" + output_type)
    if sys.version_info.major < 3:
        flag = raw_input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
    else:
        flag = input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
    if flag == 'y':
        for i in range(files.__len__()):
            if files[i] == base_img_path:
                print("base image " + (i + 1).__str__() + "/" + files.__len__().__str__())
                print("-" * 80)
                continue
            resampleFrame(base_img_path, files[i], names[i], out_path[i])
            print("resampled " + (i + 1).__str__() + "/" + files.__len__().__str__())
            print("-" * 80)
        print("All frames were resampled.")
    else:
        exit()
elif sys.argv.__len__() == 6:
    path = sys.argv[1]
    base_img_path = sys.argv[2]
    file_type = sys.argv[3]
    output_type = sys.argv[4]
    output_path = sys.argv[5]

    if output_type[0] != '.':
        output_type = '.' + output_type
    out_path = []
    paths, names, files = findAllFiles(path, file_type)
    for item in names:
        out_path.append(output_path + os.path.sep + item.split('.')[0] + "_resample" + output_type)
    if sys.version_info.major < 3:
        flag = raw_input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
    else:
        flag = input(files.__len__().__str__() + " frames in total." + "\nStart resample?y/n\n")
    if flag == 'y':
        for i in range(files.__len__()):
            if files[i] == base_img_path:
                print("base image " + (i + 1).__str__() + "/" + files.__len__().__str__())
                print("-" * 80)
                continue
            resampleFrame(base_img_path, files[i], names[i], out_path[i])
            print("resampled " + (i + 1).__str__() + "/" + files.__len__().__str__())
            print("-" * 80)
        print("All frames were resampled.")
    else:
        exit()
else:
    print("Input \"scriptname.py help\" for help information.")

2.测试效果

如上图所示,分别是没有进行稳像重采的一段视频和处理过的一段视频。

本文作者原创,未经许可不得转载,谢谢配合

返回顶部