forked from kamilstanuch/Autocrop-vertical
-
Notifications
You must be signed in to change notification settings - Fork 571
Expand file tree
/
Copy patheditor.py
More file actions
376 lines (314 loc) · 16.6 KB
/
editor.py
File metadata and controls
376 lines (314 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import os
import json
import re
import subprocess
import time
from google import genai
from google.genai import types
class VideoEditor:
def __init__(self, api_key):
self.client = genai.Client(api_key=api_key)
self.model_name = "gemini-3-flash-preview"
def upload_video(self, video_path):
"""Uploads video to Gemini File API."""
print(f"📤 Uploading {video_path} to Gemini...")
# Ensure we are passing a path that exists
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
# Using 'file' keyword instead of 'path'
try:
file_upload = self.client.files.upload(file=video_path)
except Exception as e:
print(f"❌ Gemini Upload Error: {e}")
raise e
# Wait for processing
print("⏳ Waiting for video processing by Gemini...")
while True:
file_info = self.client.files.get(name=file_upload.name)
if file_info.state == "ACTIVE":
print("✅ Video processed and ready.")
return file_upload
elif file_info.state == "FAILED":
raise Exception("Video processing failed by Gemini.")
time.sleep(2)
def get_ffmpeg_filter(self, video_file_obj, duration, fps=30, width=None, height=None, transcript=None):
"""Asks Gemini for a raw FFmpeg filter string."""
if width is None or height is None:
# Keep prompt usable even if caller didn't pass dimensions.
width, height = 1080, 1920
transcript_text = json.dumps(transcript) if transcript else "Not available."
prompt = f"""
You are an expert FFmpeg video editor. Your task is to generate a complex video filter string to make a short video viral, BUT ONLY apply effects where they make sense contextually.
Video Duration: {duration} seconds.
Video FPS: {fps}
Video Resolution (MUST KEEP EXACT): {width}x{height}
TRANSCRIPT (Context of what is being said):
{transcript_text}
Goal: Enhance the video with dynamic zooms, cuts (simulated with punch-ins), and visual effects to increase retention, but DO NOT overdo it. Random effects are bad. Contextual effects are good.
Instructions:
1. ANALYZE THE VIDEO AND TRANSCRIPT: Understand the mood, the pacing, and the key moments.
2. APPLY EFFECTS ONLY WHEN RELEVANT:
- Use "punch-in" zooms (zoompan) to emphasize key points, jokes, or dramatic moments in the speech.
- slow zooms to face when the speaker is speaking
- Use visual effects (contrast, saturation, sharpness) to highlight mood changes or specific segments.
- If nothing significant is happening, keep it simple. It is BETTER to have no effect than a random/distracting one.
- Avoid constant motion if the speaker is delivering a serious or steady message.
3. Create a single valid FFmpeg filter complex string (for the -vf flag).
4. Use filters like `zoompan`, `eq` (contrast), `hue` (saturation/bw), `unsharp`.
5. Pacing: Align effects with the rhythm of the speech (from transcript) or visual action.
6. CRITICAL SYNTAX RULES:
- DO NOT use comparison operators like `<`, `>`, `<=`, `>=` anywhere. They frequently break FFmpeg expression parsing.
- USE FFmpeg expression FUNCTIONS instead:
- `between(x,a,b)`
- `lt(x,y)`, `lte(x,y)`, `gt(x,y)`, `gte(x,y)`
- `if(cond,then,else)`
- Always wrap expression values in single quotes: `z='...'`, `x='...'`, `y='...'`, `enable='...'`.
- FOR `zoompan`:
- Prefer `on` (output frame index) to avoid time-variable quirks.
- Convert seconds to frames using FPS={fps}: `frame = seconds * {fps}`.
- Use `between(on, startFrame, endFrame)` for segmenting and pacing.
- Example:
`zoompan=z='1.1*between(on,0,75)+1.3*between(on,76,150)+1.15*between(on,151,300)+1.2*gte(on,301)'`
- ALWAYS set zoompan output size to EXACT `{width}x{height}` using `s={width}x{height}`.
- ALWAYS set `fps={fps}` and `d=1`.
- DO NOT use `scale`, `crop`, `pad` unless you keep EXACT `{width}x{height}` (no aspect ratio changes).
- FOR `eq`, `hue`, `curves`, `unsharp` (Visual Effects):
- **DO NOT** use dynamic expressions for parameter values (e.g. `contrast='1+0.5*t'`).
- **USE TIMELINE EDITING** via the `enable` option.
- Create MULTIPLE filter instances for different time ranges.
- **SYNTAX FOR ENABLE:**
- **USE** `between(t,start,end)` for clarity and robustness.
- **USE** single quotes around the enable expression.
- **Example:** `eq=contrast=1.2:enable='between(t,0,3)'`
- **Example:** `hue=s=0:enable='between(t,10,12)'`
- This is much safer and robust than boolean multiplication.
Constraints:
- Output JSON with a single key: "filter_string".
- The value must be the RAW filter string ready to be passed to `-vf`.
- OUTPUT MUST KEEP EXACT RESOLUTION AND ASPECT RATIO: {width}x{height}.
- Do NOT output 1280x720 or 1080x1080 unless the input is exactly that.
- IMPORTANT: Do NOT include the `-vf` flag itself, just the filter content.
- IMPORTANT: Ensure syntax is correct for FFmpeg.
Output JSON:
{{
"filter_string": "..."
}}
"""
print("🤖 Asking Gemini for FFmpeg filter...")
response = self.client.models.generate_content(
model=self.model_name,
contents=[video_file_obj, prompt],
config=types.GenerateContentConfig(
response_mime_type="application/json"
)
)
print(f"🔍 DEBUG: Gemini Raw Response:\n{response.text}")
try:
# Clean response text (remove potential markdown blocks)
text = response.text
if text.startswith("```json"):
text = text[7:]
elif text.startswith("```"):
text = text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
# Additional cleanup for potential trailing characters outside JSON
# Find the first '{' and last '}'
start_idx = text.find('{')
end_idx = text.rfind('}')
if start_idx != -1 and end_idx != -1:
text = text[start_idx:end_idx+1]
print(f"🔍 DEBUG: Cleaned JSON Text:\n{text}")
return json.loads(text)
except json.JSONDecodeError:
print(f"❌ Failed to parse JSON: {response.text}")
return None
def get_effects_config(self, video_file_obj, duration, fps=30, width=None, height=None, transcript=None):
"""Asks Gemini for a structured EffectsConfig JSON for Remotion rendering."""
if width is None or height is None:
width, height = 1080, 1920
transcript_text = json.dumps(transcript) if transcript else "Not available."
prompt = f"""
You are an expert video editor analyzing a video and its transcript to generate dynamic visual effects for a Remotion-based renderer.
Video Duration: {duration} seconds.
Video FPS: {fps}
Video Resolution: {width}x{height}
TRANSCRIPT (Context of what is being said):
{transcript_text}
Your task is to produce a structured JSON describing time-based effect segments that cover the FULL video duration.
Each segment has these fields:
- "startSec" (number): Start time in seconds.
- "endSec" (number): End time in seconds.
- "zoom" (number): Zoom level. 1.0 = no zoom, max 1.5. Use subtle values like 1.05-1.2 for most cases.
- "zoomCenterX" (number): Horizontal focus point for zoom, 0.0 (left) to 1.0 (right). 0.5 = center.
- "zoomCenterY" (number): Vertical focus point for zoom, 0.0 (top) to 1.0 (bottom). 0.5 = center.
- "brightness" (number): Brightness multiplier. 1.0 = normal. Range 0.8-1.2.
- "contrast" (number): Contrast multiplier. 1.0 = normal. Range 0.8-1.3.
- "saturate" (number): Saturation multiplier. 1.0 = normal. Range 0.8-1.3.
Instructions:
1. ANALYZE the video content and transcript to understand mood, pacing, and key moments.
2. Apply CONTEXTUAL effects aligned with speech and action:
- Use slow, subtle zooms toward the speaker's face during speaking moments.
- Emphasize key moments, punchlines, or dramatic beats with slightly stronger zoom or contrast.
- Keep transitions smooth — avoid jarring jumps between segments.
- If nothing significant is happening, keep values at defaults (zoom 1.0, all multipliers 1.0).
3. Segments MUST cover the entire video duration from 0 to {duration} seconds with no gaps.
4. Prefer fewer, longer segments with gradual changes over many rapid short segments.
5. Output ONLY valid JSON, no explanations.
Output format:
{{
"segments": [
{{
"startSec": 0,
"endSec": 3.5,
"zoom": 1.0,
"zoomCenterX": 0.5,
"zoomCenterY": 0.5,
"brightness": 1.0,
"contrast": 1.0,
"saturate": 1.0
}}
]
}}
"""
print("🤖 Asking Gemini for Remotion effects config...")
response = self.client.models.generate_content(
model=self.model_name,
contents=[video_file_obj, prompt],
config=types.GenerateContentConfig(
response_mime_type="application/json"
)
)
print(f"🔍 DEBUG: Gemini Raw Response:\n{response.text}")
try:
# Clean response text (remove potential markdown blocks)
text = response.text
if text.startswith("```json"):
text = text[7:]
elif text.startswith("```"):
text = text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
# Find the first '{' and last '}'
start_idx = text.find('{')
end_idx = text.rfind('}')
if start_idx != -1 and end_idx != -1:
text = text[start_idx:end_idx+1]
print(f"🔍 DEBUG: Cleaned JSON Text:\n{text}")
return json.loads(text)
except json.JSONDecodeError:
print(f"❌ Failed to parse effects config JSON: {response.text}")
return None
@staticmethod
def _split_filter_chain(filter_string: str) -> list[str]:
"""Split a -vf filter chain on commas, respecting single-quoted substrings."""
parts: list[str] = []
start = 0
in_quote = False
for i, ch in enumerate(filter_string):
if ch == "'":
in_quote = not in_quote
elif ch == "," and not in_quote:
parts.append(filter_string[start:i])
start = i + 1
parts.append(filter_string[start:])
return parts
@classmethod
def _enforce_zoompan_output_size(cls, filter_string: str, width: int, height: int) -> str:
"""Force any zoompan filter to output the same geometry as the input clip."""
parts = cls._split_filter_chain(filter_string)
out_parts: list[str] = []
for part in parts:
if "zoompan=" in part:
# Force s=WxH inside zoompan options (digitsxdigits only).
if re.search(r":s=\d+x\d+", part):
part = re.sub(r":s=\d+x\d+", f":s={width}x{height}", part)
else:
part = f"{part}:s={width}x{height}"
out_parts.append(part)
return ",".join(out_parts)
@staticmethod
def _sanitize_filter_string(filter_string: str) -> str:
"""
Best-effort sanitizer for Gemini-generated FFmpeg expressions.
Converts comparison operators (t<3, on>=75, etc.) into FFmpeg expr functions (lt(), gte(), ...),
which are far more reliably parsed across FFmpeg builds.
"""
s = filter_string
# Order matters: handle >= / <= before > / <
patterns: list[tuple[re.Pattern[str], str]] = [
(re.compile(r"(?<![A-Za-z0-9_])([A-Za-z_]\w*)\s*>=\s*(-?\d+(?:\.\d+)?)"), r"gte(\1,\2)"),
(re.compile(r"(?<![A-Za-z0-9_])([A-Za-z_]\w*)\s*<=\s*(-?\d+(?:\.\d+)?)"), r"lte(\1,\2)"),
(re.compile(r"(?<![A-Za-z0-9_])([A-Za-z_]\w*)\s*>\s*(-?\d+(?:\.\d+)?)"), r"gt(\1,\2)"),
(re.compile(r"(?<![A-Za-z0-9_])([A-Za-z_]\w*)\s*<\s*(-?\d+(?:\.\d+)?)"), r"lt(\1,\2)"),
]
for pat, repl in patterns:
s = pat.sub(repl, s)
return s
def apply_edits(self, input_path, output_path, filter_data):
"""Executes FFmpeg with the generated filter."""
if not filter_data or "filter_string" not in filter_data:
print("⚠️ No filter string found. Copying original.")
subprocess.run(['ffmpeg', '-y', '-i', input_path, '-c', 'copy', output_path])
return
filter_string = filter_data["filter_string"]
# Get input dimensions so we can enforce geometry (avoid broken aspect ratios).
try:
probe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'csv=s=x:p=0', input_path]
res_out = subprocess.check_output(probe_cmd, env={**os.environ, "LANG": "C.UTF-8"}).decode().strip()
w, h = map(int, res_out.split('x'))
except Exception as e:
print(f"⚠️ Could not probe resolution: {e}")
w, h = None, None
# Sanitize common expression pitfalls (e.g., t<3 / on>=75) before executing FFmpeg.
sanitized = self._sanitize_filter_string(filter_string)
if sanitized != filter_string:
print("🧼 Sanitized AI Filter (converted comparisons to lt/lte/gt/gte functions)")
print(f"🧼 Before: {filter_string}")
print(f"🧼 After: {sanitized}")
filter_string = sanitized
# Enforce zoompan output size to preserve aspect ratio / resolution.
if w and h:
enforced = self._enforce_zoompan_output_size(filter_string, w, h)
if enforced != filter_string:
print(f"📐 Enforced zoompan output size to {w}x{h}")
filter_string = enforced
# Ensure square pixels (avoid weird display stretching in some players).
if "setsar=" not in filter_string:
filter_string = f"{filter_string},setsar=1"
print(f"🎬 Executing AI Filter: {filter_string}")
cmd = [
'ffmpeg', '-y',
'-i', input_path,
'-vf', filter_string,
'-c:v', 'libx264', '-preset', 'fast', '-crf', '22',
'-c:a', 'copy',
output_path
]
# Use explicit environment with UTF-8 to avoid ascii errors in subprocess
env = os.environ.copy()
# On some minimal docker images, we need to ensure we use a UTF-8 locale
# Try C.UTF-8 first, fallback to en_US.UTF-8 if available, but C.UTF-8 is usually safer for minimal
env["LANG"] = "C.UTF-8"
env["LC_ALL"] = "C.UTF-8"
try:
# We must encode arguments if filesystem is ascii but we have unicode chars
# But subprocess in Python 3 handles unicode args by encoding them with os.fsencode().
# If sys.getfilesystemencoding() is ascii, this fails.
# We can't change fs encoding at runtime easily.
# Workaround: pass bytes directly? subprocess allows bytes in args.
# Convert command elements to bytes assuming utf-8 if they are strings
cmd_bytes = []
for arg in cmd:
if isinstance(arg, str):
cmd_bytes.append(arg.encode('utf-8'))
else:
cmd_bytes.append(arg)
subprocess.run(cmd_bytes, check=True, env=env)
except subprocess.CalledProcessError as e:
print(f"❌ FFmpeg failed: {e}")
raise e
if __name__ == "__main__":
pass