471 lines
12 KiB
Python
Executable File
471 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import csv
|
|
|
|
import ffmpy
|
|
|
|
import argparse
|
|
|
|
from multiprocessing import cpu_count
|
|
|
|
from collections import OrderedDict
|
|
|
|
import json
|
|
|
|
from typing import Union, Any
|
|
|
|
# encoding options used
|
|
encoding: dict[str, Any] = {
|
|
"libx264": {
|
|
"crf": [10, 14, 16, 18, 20, 22, 25],
|
|
"presets": [
|
|
"superfast",
|
|
"veryfast",
|
|
"faster",
|
|
"fast",
|
|
"medium",
|
|
"slow",
|
|
"slower",
|
|
"veryslow",
|
|
],
|
|
},
|
|
"libx265": {
|
|
"crf": [10, 14, 16, 18, 20, 22, 25],
|
|
"presets": [
|
|
"superfast",
|
|
"veryfast",
|
|
"faster",
|
|
"fast",
|
|
"medium",
|
|
"slow",
|
|
"slower",
|
|
"veryslow",
|
|
],
|
|
},
|
|
"libaom-av1": {"crf": [20, 25, 30, 35, 40], "presets": [2, 3, 4, 6]},
|
|
"libsvtav1": {"crf": [20, 25, 30, 35, 40], "presets": [4, 6, 8, 10]},
|
|
}
|
|
|
|
# program version
|
|
# make tests reproducible by tag
|
|
version = "v0.0.3"
|
|
|
|
# always round timestamp to integer
|
|
def now():
|
|
return int(time.time())
|
|
|
|
|
|
def cleanup(keepencodes: bool, outputfile: str = ""):
|
|
"""
|
|
Clean up files on program exit/interruption
|
|
"""
|
|
|
|
try:
|
|
if not keepencodes:
|
|
os.remove(outputfile)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
os.remove("vmaf.json")
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
os.remove("ssim.log")
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
os.remove("psnr.log")
|
|
except:
|
|
pass
|
|
|
|
|
|
def write_line(
|
|
datafile: str,
|
|
codec: str,
|
|
crf: int,
|
|
preset: Union[str, int],
|
|
infile: str,
|
|
outfilesize: float,
|
|
enctime: int,
|
|
vmafmean: float,
|
|
vmafmin: float,
|
|
ssim: float,
|
|
mse: float,
|
|
):
|
|
"""
|
|
Write line to data csv
|
|
|
|
Parameters:
|
|
datafile (str): Path to output CSV filename
|
|
codec (str): Codec used
|
|
crf (int): CRF used
|
|
preset (str/int): Preset used
|
|
infile (str): Input file name
|
|
outfilesize (float): Size of output file
|
|
enctime (int): Time to encode
|
|
vmafmean (float): Mean VMAF score
|
|
vmafmin (float): Min VMAF score
|
|
ssim (float): SSIM Score
|
|
mse (float): MSE Score
|
|
"""
|
|
with open(datafile, "a", newline="") as file:
|
|
write = csv.writer(file)
|
|
write.writerow(
|
|
(
|
|
codec,
|
|
crf,
|
|
preset,
|
|
infile,
|
|
outfilesize,
|
|
enctime,
|
|
vmafmean,
|
|
vmafmin,
|
|
ssim,
|
|
mse,
|
|
)
|
|
)
|
|
|
|
|
|
def encode_general(
|
|
inputfile: str, outputfile: str, codec: str, crf: int, preset: Union[str, int]
|
|
):
|
|
"""
|
|
General encoding function
|
|
|
|
Parameters:
|
|
inputfile (str): Path to input file
|
|
outputfile (str): Path to output file
|
|
codec (str): Codec used
|
|
crf (int): CRF value
|
|
preset (str/int): Choosen preset
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs={inputfile: None},
|
|
outputs={
|
|
outputfile: "-c:v {videocodec} -crf {crf} -preset {preset} -g 240 -map 0:v:0 ".format(
|
|
videocodec=codec,
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
},
|
|
)
|
|
|
|
return ff
|
|
|
|
|
|
def encode_libaom(inputfile: str, outputfile: str, crf: int, preset: Union[str, int]):
|
|
"""
|
|
Encoding with libaom
|
|
|
|
Parameters:
|
|
inputfile (str): Path to input file
|
|
outputfile (str): Path to output file
|
|
crf (int): CRF value
|
|
preset (str/int): Choosen preset
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs={inputfile: None},
|
|
outputs={
|
|
outputfile: "-c:v libaom-av1 -crf {crf} -b:v 0 -cpu-used {preset} -row-mt 1 -tiles 2x2 -g 240 -map 0:v:0 ".format(
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
},
|
|
)
|
|
|
|
return ff
|
|
|
|
|
|
def score_vmaf(outputfile: str, inputfile: str) -> dict[str, float]:
|
|
"""
|
|
Calculate a file's VMAF score. Higher is better
|
|
|
|
Parameters:
|
|
outputfile (str): Path to output file
|
|
inputfile (str): Path to input file
|
|
|
|
Return:
|
|
dict[str, float]: VMAF mean and min value
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs=OrderedDict([(outputfile, None), (inputfile, None)]),
|
|
outputs={
|
|
"-": "-filter_complex libvmaf=log_fmt=json:n_threads={cputhreads}:log_path=vmaf.json -f null".format(
|
|
cputhreads=cpu_count()
|
|
)
|
|
},
|
|
)
|
|
|
|
ff.run()
|
|
|
|
with open("vmaf.json", "r") as file:
|
|
vmafall = json.load(file)
|
|
|
|
vmaf: dict[str, float] = {
|
|
"mean": vmafall["pooled_metrics"]["vmaf"]["mean"],
|
|
"min": vmafall["pooled_metrics"]["vmaf"]["min"],
|
|
}
|
|
|
|
return vmaf
|
|
|
|
|
|
def parse_kv_files(inputfile: str) -> list[dict[str, Any]]:
|
|
# create list of dicts. Each dict is one line in the file
|
|
lines: list[dict[str, Any]] = []
|
|
# Steps to get mse value
|
|
with open(inputfile) as file:
|
|
for line in file:
|
|
linelist = line.rstrip().split()
|
|
# NOTE stripping the last item because ssim has values like this in the location: (26.088579)
|
|
# For PSNR, psnr_v:87.80 is affected. (YUV color space). Not really a priority to fix atm
|
|
linedict = dict(kv.split(":") for kv in linelist[:-1])
|
|
lines.append(linedict)
|
|
|
|
return lines
|
|
|
|
|
|
def score_ssim(outputfile: str, inputfile: str):
|
|
"""
|
|
Calculate a file's SSIM rating. TBD
|
|
|
|
Parameters:
|
|
outputfile (str): Path to output file
|
|
inputfile (str): Path to input file
|
|
|
|
Return:
|
|
TBD
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs=OrderedDict([(outputfile, None), (inputfile, None)]),
|
|
outputs={"-": "-lavfi ssim=stats_file=ssim.log -f null"},
|
|
)
|
|
|
|
ff.run()
|
|
|
|
# get average ssim value
|
|
ssim: list[dict[str, Any]] = parse_kv_files("ssim.log")
|
|
|
|
# add all ssim_avg values together
|
|
ssim_tot: float = 0.0
|
|
for i in ssim:
|
|
ssim_tot += float(i["All"])
|
|
|
|
# get mse average
|
|
ssim_avg: float = ssim_tot / len(ssim)
|
|
|
|
return ssim_avg
|
|
|
|
|
|
def score_psnr(outputfile: str, inputfile: str) -> float:
|
|
"""
|
|
Calculate a file's MSE (mean-square error) using PSNR. A lower value is better
|
|
|
|
Parameters:
|
|
outputfile (str): Path to output file
|
|
inputfile (str): Path to input file
|
|
|
|
Return:
|
|
TODO
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs=OrderedDict([(outputfile, None), (inputfile, None)]),
|
|
outputs={"-": "-lavfi psnr=stats_file=psnr.log -f null"},
|
|
# outputs={"-": "-filter_complex psnr=stats_file=psnr.log -f null"},
|
|
)
|
|
|
|
ff.run()
|
|
|
|
# get average mse value
|
|
psnr: list[dict[str, Any]] = parse_kv_files("psnr.log")
|
|
|
|
# add all mse_avg values together
|
|
mse_tot: float = 0.0
|
|
for i in psnr:
|
|
mse_tot += float(i["mse_avg"])
|
|
|
|
# get mse average
|
|
mse_avg: float = mse_tot / len(psnr)
|
|
|
|
return mse_avg
|
|
|
|
|
|
def main(inputfile: str, outputpath: str = "encodes", keepencodes: bool = False):
|
|
"""
|
|
Main program function so this program can be used from the command line or as a library import.
|
|
|
|
Parameters:
|
|
inputfile (str): Path to input video file
|
|
|
|
Optional Parameters:
|
|
outputpath (str): Path to output folder. Defaults to "encodes" in the current working directory
|
|
"""
|
|
if not os.path.isdir(outputpath):
|
|
os.mkdir(outputpath)
|
|
|
|
datafile = version + "-data-" + str(now()) + ".csv"
|
|
|
|
inputfilename = os.path.basename(os.path.splitext(inputfile)[0])
|
|
|
|
with open(datafile, "w", newline="") as file:
|
|
write = csv.writer(file)
|
|
write.writerow(
|
|
(
|
|
"Codec",
|
|
"CRF",
|
|
"Preset",
|
|
"Input file",
|
|
"Output file size (MiB)",
|
|
"Encode time (s)",
|
|
"VMAF Score (mean)",
|
|
"VMAF Score (min)",
|
|
"SSIM Score",
|
|
"MSE Score",
|
|
)
|
|
)
|
|
|
|
for codec in encoding:
|
|
for crf in encoding[codec]["crf"]:
|
|
for preset in encoding[codec]["presets"]:
|
|
# TODO selection of output location with arguments?
|
|
outputfile = os.path.join(
|
|
outputpath,
|
|
(
|
|
inputfilename
|
|
+ "-codec_"
|
|
+ codec
|
|
+ "-crf_"
|
|
+ str(crf)
|
|
+ "-preset_"
|
|
+ str(preset)
|
|
+ ".mkv"
|
|
),
|
|
)
|
|
|
|
# libaom needs additional options
|
|
if codec == "libaom-av1":
|
|
ff = encode_libaom(
|
|
inputfile=inputfile,
|
|
outputfile=outputfile,
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
else:
|
|
ff = encode_general(
|
|
inputfile=inputfile,
|
|
outputfile=outputfile,
|
|
codec=codec,
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
|
|
# execute previously defined encoding settings
|
|
starttime = now()
|
|
try:
|
|
ff.run()
|
|
except ffmpy.FFRuntimeError:
|
|
with open("error.log", "a") as file:
|
|
file.write(
|
|
"FFMPEG error. Failed encoding",
|
|
inputfile,
|
|
"to",
|
|
outputfile,
|
|
)
|
|
|
|
endtime = now()
|
|
difftime = int(endtime - starttime)
|
|
|
|
outputfilesize = os.path.getsize(outputfile) / 1024 / 1024
|
|
|
|
try:
|
|
vmaf = score_vmaf(outputfile=outputfile, inputfile=inputfile)
|
|
except ffmpy.FFRuntimeError:
|
|
with open("error.log", "a") as file:
|
|
file.write(
|
|
"FFMPEG error. To calculate VMAF score of",
|
|
outputfile,
|
|
)
|
|
|
|
try:
|
|
ssim = score_ssim(outputfile=outputfile, inputfile=inputfile)
|
|
except ffmpy.FFRuntimeError:
|
|
with open("error.log", "a") as file:
|
|
file.write(
|
|
"FFMPEG error. To calculate SSIM score of",
|
|
outputfile,
|
|
)
|
|
|
|
try:
|
|
mse = score_psnr(outputfile=outputfile, inputfile=inputfile)
|
|
except ffmpy.FFRuntimeError:
|
|
with open("error.log", "a") as file:
|
|
file.write(
|
|
"FFMPEG error. To calculate MSE score of",
|
|
outputfile,
|
|
)
|
|
|
|
write_line(
|
|
datafile=datafile,
|
|
codec=codec,
|
|
crf=crf,
|
|
preset=preset,
|
|
infile=os.path.basename(inputfile),
|
|
outfilesize=outputfilesize,
|
|
enctime=difftime,
|
|
vmafmean=vmaf["mean"],
|
|
vmafmin=vmaf["min"],
|
|
ssim=ssim,
|
|
mse=mse,
|
|
)
|
|
|
|
cleanup(keepencodes=keepencodes, outputfile=outputfile)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="")
|
|
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output-dir",
|
|
required=False,
|
|
type=str,
|
|
help='Output directory for encodes. Default is "encodes" in current working directory',
|
|
default="encodes",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-i",
|
|
"--input-file",
|
|
required=True,
|
|
type=str,
|
|
help="Input file to encode",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-k",
|
|
"--keep-encodes",
|
|
action="store_true",
|
|
required=False,
|
|
help="Don't delete encodes after getting their scores. Default is False (delete)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
outputpath: str = args.output_dir
|
|
inputfile: str = args.input_file
|
|
|
|
keepencodes: bool = args.keep_encodes
|
|
|
|
# main program loop with cleanup in case of interrupt
|
|
try:
|
|
main(inputfile=inputfile, outputpath=outputpath, keepencodes=keepencodes)
|
|
except KeyboardInterrupt:
|
|
# run cleanup function on keyboard interrupt
|
|
cleanup(keepencodes=keepencodes)
|