355 lines
9.0 KiB
Python
Executable File
355 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import csv
|
|
|
|
import ffmpy
|
|
|
|
from multiprocessing import cpu_count
|
|
|
|
from collections import OrderedDict
|
|
|
|
import json
|
|
|
|
from typing import Union, Any
|
|
|
|
# encoding options used
|
|
encoding: dict[str, Any] = {
|
|
"libx264": {
|
|
"crf": [10, 14, 16, 18, 20, 22, 25],
|
|
"presets": [
|
|
"superfast",
|
|
"veryfast",
|
|
"faster",
|
|
"fast",
|
|
"medium",
|
|
"slow",
|
|
"slower",
|
|
"veryslow",
|
|
],
|
|
},
|
|
"libx265": {
|
|
"crf": [10, 14, 16, 18, 20, 22, 25],
|
|
"presets": [
|
|
"superfast",
|
|
"veryfast",
|
|
"faster",
|
|
"fast",
|
|
"medium",
|
|
"slow",
|
|
"slower",
|
|
"veryslow",
|
|
],
|
|
},
|
|
"libaom-av1": {"crf": [20, 25, 30, 35, 40], "presets": [0, 2, 4, 6]},
|
|
"libsvtav1": {"crf": [20, 25, 30, 35, 40], "presets": [0, 4, 8, 12]},
|
|
}
|
|
|
|
# program version
|
|
# make tests reproducible by tag
|
|
version = "v0.0.1a3"
|
|
|
|
# always round timestamp to integer
|
|
def now():
|
|
return int(time.time())
|
|
|
|
|
|
def write_line(
|
|
codec: str,
|
|
crf: int,
|
|
preset: Union[str, int],
|
|
infile: str,
|
|
outfilesize: float,
|
|
enctime: int,
|
|
vmafmean: float,
|
|
vmafmin: float,
|
|
ssim: float,
|
|
mse: float,
|
|
):
|
|
"""
|
|
Write line to data csv
|
|
|
|
Parameters:
|
|
codec (str): Codec used
|
|
crf (int): CRF used
|
|
preset (str/int): Preset used
|
|
infile (str): Input file name
|
|
outfilesize (float): Size of output file
|
|
enctime (int): Time to encode
|
|
vmafmean (float): Mean VMAF score
|
|
vmafmin (float): Min VMAF score
|
|
ssim (float): SSIM Score
|
|
mse (float): MSE Score
|
|
"""
|
|
with open(datafile, "a", newline="") as file:
|
|
write = csv.writer(file)
|
|
write.writerow(
|
|
(
|
|
codec,
|
|
crf,
|
|
preset,
|
|
infile,
|
|
outfilesize,
|
|
enctime,
|
|
vmafmean,
|
|
vmafmin,
|
|
ssim,
|
|
mse,
|
|
)
|
|
)
|
|
|
|
|
|
def encode_general(
|
|
inputfile: str, outputfile: str, codec: str, crf: int, preset: Union[str, int]
|
|
):
|
|
"""
|
|
General encoding function
|
|
|
|
Parameters:
|
|
inputfile (str): Path to input file
|
|
outputfile (str): Path to output file
|
|
codec (str): Codec used
|
|
crf (int): CRF value
|
|
preset (str/int): Choosen preset
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs={inputfile: None},
|
|
outputs={
|
|
outputfile: "-c:v {videocodec} -crf {crf} -preset {preset} -g 240 -map 0:v:0 ".format(
|
|
videocodec=codec,
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
},
|
|
)
|
|
|
|
return ff
|
|
|
|
|
|
def encode_libaom(inputfile: str, outputfile: str, crf: int, preset: Union[str, int]):
|
|
"""
|
|
Encoding with libaom
|
|
|
|
Parameters:
|
|
inputfile (str): Path to input file
|
|
outputfile (str): Path to output file
|
|
crf (int): CRF value
|
|
preset (str/int): Choosen preset
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs={inputfile: None},
|
|
outputs={
|
|
outputfile: "-c:v libaom-av1 -crf {crf} -b:v 0 -cpu-used {preset} -row-mt 1 -tiles 2x2 -g 240 -map 0:v:0 ".format(
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
},
|
|
)
|
|
|
|
return ff
|
|
|
|
|
|
def score_vmaf(outputfile: str, inputfile: str) -> dict[str, float]:
|
|
"""
|
|
Calculate a file's VMAF score. Higher is better
|
|
|
|
Parameters:
|
|
outputfile (str): Path to output file
|
|
inputfile (str): Path to input file
|
|
|
|
Return:
|
|
dict[str, float]: VMAF mean and min value
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs=OrderedDict([(outputfile, None), (inputfile, None)]),
|
|
outputs={
|
|
"-": "-filter_complex libvmaf=log_fmt=json:n_threads={cputhreads}:log_path=vmaf.json -f null".format(
|
|
cputhreads=cpu_count()
|
|
)
|
|
},
|
|
)
|
|
|
|
ff.run()
|
|
|
|
with open("vmaf.json", "r") as file:
|
|
vmafall = json.load(file)
|
|
|
|
vmaf: dict[str, float] = {
|
|
"mean": vmafall["pooled_metrics"]["vmaf"]["mean"],
|
|
"min": vmafall["pooled_metrics"]["vmaf"]["min"],
|
|
}
|
|
|
|
return vmaf
|
|
|
|
|
|
def parse_kv_files(inputfile: str) -> list[dict[str, Any]]:
|
|
# create list of dicts. Each dict is one line in the file
|
|
lines: list[dict[str, Any]] = []
|
|
# Steps to get mse value
|
|
with open(inputfile) as file:
|
|
for line in file:
|
|
linelist = line.rstrip().split()
|
|
# NOTE stripping the last item because ssim has values like this in the location: (26.088579)
|
|
# For PSNR, psnr_v:87.80 is affected. (YUV color space). Not really a priority to fix atm
|
|
linedict = dict(kv.split(":") for kv in linelist[:-1])
|
|
lines.append(linedict)
|
|
|
|
return lines
|
|
|
|
|
|
def score_ssim(outputfile: str, inputfile: str):
|
|
"""
|
|
Calculate a file's SSIM rating. TBD
|
|
|
|
Parameters:
|
|
outputfile (str): Path to output file
|
|
inputfile (str): Path to input file
|
|
|
|
Return:
|
|
TBD
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs=OrderedDict([(outputfile, None), (inputfile, None)]),
|
|
outputs={"-": "-lavfi ssim=stats_file=ssim.log -f null"},
|
|
)
|
|
|
|
ff.run()
|
|
|
|
# get average ssim value
|
|
ssim: list[dict[str, Any]] = parse_kv_files("ssim.log")
|
|
|
|
# add all ssim_avg values together
|
|
ssim_tot: float = 0.0
|
|
for i in ssim:
|
|
ssim_tot += float(i["All"])
|
|
|
|
# get mse average
|
|
ssim_avg: float = ssim_tot / len(ssim)
|
|
|
|
return ssim_avg
|
|
|
|
|
|
def score_psnr(outputfile: str, inputfile: str) -> float:
|
|
"""
|
|
Calculate a file's MSE (mean-square error) using PSNR. A lower value is better
|
|
|
|
Parameters:
|
|
outputfile (str): Path to output file
|
|
inputfile (str): Path to input file
|
|
|
|
Return:
|
|
TBD
|
|
"""
|
|
ff = ffmpy.FFmpeg(
|
|
inputs=OrderedDict([(outputfile, None), (inputfile, None)]),
|
|
outputs={"-": "-lavfi psnr=stats_file=psnr.log -f null"},
|
|
# outputs={"-": "-filter_complex psnr=stats_file=psnr.log -f null"},
|
|
)
|
|
|
|
ff.run()
|
|
|
|
# get average mse value
|
|
psnr: list[dict[str, Any]] = parse_kv_files("psnr.log")
|
|
|
|
# add all mse_avg values together
|
|
mse_tot: float = 0.0
|
|
for i in psnr:
|
|
mse_tot += float(i["mse_avg"])
|
|
|
|
# get mse average
|
|
mse_avg: float = mse_tot / len(psnr)
|
|
|
|
return mse_avg
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if not os.path.isdir("encodes"):
|
|
os.mkdir("encodes")
|
|
|
|
datafile = version + "-data-" + str(now()) + ".csv"
|
|
|
|
with open(datafile, "w", newline="") as file:
|
|
write = csv.writer(file)
|
|
write.writerow(
|
|
(
|
|
"Codec",
|
|
"CRF",
|
|
"Preset",
|
|
"Input file",
|
|
"Output file size (MiB)",
|
|
"Encode time (s)",
|
|
"VMAF Score (mean)",
|
|
"VMAF Score (min)",
|
|
"SSIM Score",
|
|
"MSE Score",
|
|
)
|
|
)
|
|
|
|
inputfile = "source/Sparks_in_Blender.webm"
|
|
|
|
for codec in encoding:
|
|
for crf in encoding[codec]["crf"]:
|
|
for preset in encoding[codec]["presets"]:
|
|
outputfile = os.path.join(
|
|
"encodes",
|
|
(
|
|
"Sparks_in_Blender-codec_"
|
|
+ codec
|
|
+ "-crf_"
|
|
+ str(crf)
|
|
+ "-preset_"
|
|
+ str(preset)
|
|
+ ".mkv"
|
|
),
|
|
)
|
|
|
|
# libaom needs additional options
|
|
if codec == "libaom-av1":
|
|
ff = encode_libaom(
|
|
inputfile=inputfile,
|
|
outputfile=outputfile,
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
else:
|
|
ff = encode_general(
|
|
inputfile=inputfile,
|
|
outputfile=outputfile,
|
|
codec=codec,
|
|
crf=crf,
|
|
preset=preset,
|
|
)
|
|
|
|
# execute previously defined encoding settings
|
|
starttime = now()
|
|
ff.run()
|
|
endtime = now()
|
|
difftime = int(endtime - starttime)
|
|
|
|
outputfilesize = os.path.getsize(outputfile) / 1024 / 1024
|
|
|
|
vmaf = score_vmaf(outputfile=outputfile, inputfile=inputfile)
|
|
|
|
ssim = score_ssim(outputfile=outputfile, inputfile=inputfile)
|
|
|
|
mse = score_psnr(outputfile=outputfile, inputfile=inputfile)
|
|
|
|
write_line(
|
|
codec=codec,
|
|
crf=crf,
|
|
preset=preset,
|
|
infile="Sparks_in_Blender.webm",
|
|
outfilesize=outputfilesize,
|
|
enctime=difftime,
|
|
vmafmean=vmaf["mean"],
|
|
vmafmin=vmaf["min"],
|
|
ssim=ssim,
|
|
mse=mse,
|
|
)
|
|
|
|
# TODO make removal optional/togglable with cli switch
|
|
os.remove(outputfile)
|