|
| 1 | +import copy |
| 2 | +import numpy as np |
| 3 | +import cv2 |
| 4 | +from glob import glob |
| 5 | +import os |
| 6 | +import argparse |
| 7 | +import json |
| 8 | + |
| 9 | +# video file processing setup |
| 10 | +# from: https://stackoverflow.com/a/61927951 |
| 11 | +import argparse |
| 12 | +import subprocess |
| 13 | +import sys |
| 14 | +from pathlib import Path |
| 15 | +from typing import NamedTuple |
| 16 | + |
| 17 | + |
| 18 | +class FFProbeResult(NamedTuple): |
| 19 | + return_code: int |
| 20 | + json: str |
| 21 | + error: str |
| 22 | + |
| 23 | + |
| 24 | +def ffprobe(file_path) -> FFProbeResult: |
| 25 | + command_array = ["ffprobe", |
| 26 | + "-v", "quiet", |
| 27 | + "-print_format", "json", |
| 28 | + "-show_format", |
| 29 | + "-show_streams", |
| 30 | + file_path] |
| 31 | + result = subprocess.run(command_array, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) |
| 32 | + return FFProbeResult(return_code=result.returncode, |
| 33 | + json=result.stdout, |
| 34 | + error=result.stderr) |
| 35 | + |
| 36 | + |
| 37 | +# openpose setup |
| 38 | +from src import model |
| 39 | +from src import util |
| 40 | +from src.body import Body |
| 41 | +from src.hand import Hand |
| 42 | + |
| 43 | +body_estimation = Body('model/body_pose_model.pth') |
| 44 | +hand_estimation = Hand('model/hand_pose_model.pth') |
| 45 | + |
| 46 | +def process_frame(frame, body=True, hands=True): |
| 47 | + canvas = copy.deepcopy(frame) |
| 48 | + if body: |
| 49 | + candidate, subset = body_estimation(frame) |
| 50 | + canvas = util.draw_bodypose(canvas, candidate, subset) |
| 51 | + if hands: |
| 52 | + hands_list = util.handDetect(candidate, subset, frame) |
| 53 | + all_hand_peaks = [] |
| 54 | + for x, y, w, is_left in hands_list: |
| 55 | + peaks = hand_estimation(frame[y:y+w, x:x+w, :]) |
| 56 | + peaks[:, 0] = np.where(peaks[:, 0]==0, peaks[:, 0], peaks[:, 0]+x) |
| 57 | + peaks[:, 1] = np.where(peaks[:, 1]==0, peaks[:, 1], peaks[:, 1]+y) |
| 58 | + all_hand_peaks.append(peaks) |
| 59 | + canvas = util.draw_handpose(canvas, all_hand_peaks) |
| 60 | + return canvas |
| 61 | + |
| 62 | +# writing video with ffmpeg because cv2 writer failed |
| 63 | +# https://stackoverflow.com/questions/61036822/opencv-videowriter-produces-cant-find-starting-number-error |
| 64 | +import ffmpeg |
| 65 | + |
| 66 | +# open specified video |
| 67 | +parser = argparse.ArgumentParser( |
| 68 | + description="Process a video annotating poses detected.") |
| 69 | +parser.add_argument('file', type=str, help='Video file location to process.') |
| 70 | +parser.add_argument('--no_hands', action='store_true', help='No hand pose') |
| 71 | +parser.add_argument('--no_body', action='store_true', help='No body pose') |
| 72 | +args = parser.parse_args() |
| 73 | +video_file = args.file |
| 74 | +cap = cv2.VideoCapture(video_file) |
| 75 | + |
| 76 | +# get video file info |
| 77 | +ffprobe_result = ffprobe(args.file) |
| 78 | +info = json.loads(ffprobe_result.json) |
| 79 | +videoinfo = [i for i in info["streams"] if i["codec_type"] == "video"][0] |
| 80 | +input_fps = videoinfo["avg_frame_rate"] |
| 81 | +# input_fps = float(input_fps[0])/float(input_fps[1]) |
| 82 | +input_pix_fmt = videoinfo["pix_fmt"] |
| 83 | +input_vcodec = videoinfo["codec_name"] |
| 84 | + |
| 85 | +# define a writer object to write to a movidified file |
| 86 | +postfix = info["format"]["format_name"].split(",")[0] |
| 87 | +output_file = ".".join(video_file.split(".")[:-1])+".processed." + postfix |
| 88 | + |
| 89 | + |
| 90 | +class Writer(): |
| 91 | + def __init__(self, output_file, input_fps, input_framesize, input_pix_fmt, |
| 92 | + input_vcodec): |
| 93 | + if os.path.exists(output_file): |
| 94 | + os.remove(output_file) |
| 95 | + self.ff_proc = ( |
| 96 | + ffmpeg |
| 97 | + .input('pipe:', |
| 98 | + format='rawvideo', |
| 99 | + pix_fmt="bgr24", |
| 100 | + s='%sx%s'%(input_framesize[1],input_framesize[0]), |
| 101 | + r=input_fps) |
| 102 | + .output(output_file, pix_fmt=input_pix_fmt, vcodec=input_vcodec) |
| 103 | + .overwrite_output() |
| 104 | + .run_async(pipe_stdin=True) |
| 105 | + ) |
| 106 | + |
| 107 | + def __call__(self, frame): |
| 108 | + self.ff_proc.stdin.write(frame.tobytes()) |
| 109 | + |
| 110 | + def close(self): |
| 111 | + self.ff_proc.stdin.close() |
| 112 | + self.ff_proc.wait() |
| 113 | + |
| 114 | + |
| 115 | +writer = None |
| 116 | +while(cap.isOpened()): |
| 117 | + ret, frame = cap.read() |
| 118 | + if frame is None: |
| 119 | + break |
| 120 | + |
| 121 | + posed_frame = process_frame(frame, body=not args.no_body, |
| 122 | + hands=not args.no_hands) |
| 123 | + |
| 124 | + if writer is None: |
| 125 | + input_framesize = posed_frame.shape[:2] |
| 126 | + writer = Writer(output_file, input_fps, input_framesize, input_pix_fmt, |
| 127 | + input_vcodec) |
| 128 | + |
| 129 | + cv2.imshow('frame', posed_frame) |
| 130 | + |
| 131 | + # write the frame |
| 132 | + writer(posed_frame) |
| 133 | + |
| 134 | + if cv2.waitKey(1) & 0xFF == ord('q'): |
| 135 | + break |
| 136 | + |
| 137 | +cap.release() |
| 138 | +writer.close() |
| 139 | +cv2.destroyAllWindows() |
0 commit comments