configs/music-normalize/main.py
exu 10cf2f455d Complete rewrite with pyloudnorm
Based on
    https://ffmpeg.org/pipermail/ffmpeg-user/2024-March/057775.html
    loudnorm is a low quality filter.
    Some research gave some alternatives.
    One of these is "pyloudnorm", a native python module implementing
    EBU R 128 loudness normalization.
    This is what I have implemented now instead of the old ffmpeg
    filter.

    In the future additional rewrites to use the official FLAC and OPUS
    encoders might be desirable for better compression.
2024-03-24 12:57:59 +01:00

239 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python3
# multithreading
import multiprocessing
# audio format conversions
import ffmpy
# argument parsing
import argparse
# multiprocessing stuff
from multiprocessing import Pool
# executing some commands
import subprocess
# file/directory handling
import os
# most recent starttime for program
import time
# randomness
from random import randint
# typing hints
from typing import Any, Optional
import tempfile
# working with sound files
import soundfile
# loudness normalization
import pyloudnorm
"""
Normalize loudness of all music files in a given directory and its subdirectories.
"""
musicfile_extensions = (".flac", ".wav", ".mp3", ".m4a", ".aac", ".opus")
def loudnorm(inputfile: str, outputfile: str):
"""
Normalize audio to EBU R 128 standard using pyloudnorm
Parameters:
inputfile (str): Path to input file. Format must be supported by python-soundfile module
outputfile (str): Path to output file
"""
data, rate = soundfile.read(file=inputfile)
# measure loudness
meter = pyloudnorm.Meter(rate=rate)
loudness = meter.integrated_loudness(data=data)
# normalize audio
file_normalized = pyloudnorm.normalize.loudness(
data=data, input_loudness=loudness, target_loudness=-30.0
)
# write normalized audio to file
soundfile.write(file=outputfile, data=file_normalized, samplerate=rate)
def ffmpeg_to_wav(inputfile: str, outputfile: str):
"""
Convert a file into .wav for further processing
Parameters:
inputfile (str): Path to input file
outputfile (str): Path to output file
"""
# convert to wav in temporary directory
with tempfile.TemporaryDirectory() as tempdir:
# temporary input file
temp_input: str = os.path.join(
tempdir, os.path.splitext(os.path.basename(inputfile))[0] + ".wav"
)
# temporary output file
temp_output: str = os.path.join(
tempdir,
"normalized",
os.path.splitext(os.path.basename(inputfile))[0] + ".wav",
)
os.mkdir(os.path.join(tempdir, "normalized"))
# convert audio to wav
ff = ffmpy.FFmpeg(
inputs={inputfile: None}, outputs={temp_input: None}, global_options=("-y")
)
subprocess.run(ff.cmd, shell=True, capture_output=True)
# normalize loudness
loudnorm(inputfile=temp_input, outputfile=temp_output)
# convert audio back to lossy format
outputcmd = {
outputfile: "-c:a libopus" " " "-b:a 192k" " " "-compression_level 10"
}
ff = ffmpy.FFmpeg(
inputs={temp_output: None}, outputs=outputcmd, global_options=("-y")
)
subprocess.run(ff.cmd, shell=True, capture_output=True)
def main(inputfile: str) -> Optional[list[Any]]:
"""
Main program loop
Parameters:
inputfile (str): Path to input file
Output:
dynamically normalised audio files (list)
"""
# set output folder to parent path + "normalized"
outputfolder = os.path.join(os.path.dirname(inputfile), "normalized")
# NOTE create output folder
# because multiple parallel processes are at work here,
# there might be conflicts with one trying to create the directory although it already exists
# this while loop makes sure the directory does exist
# the try/except block ensures the error is caught and (hopefully) doesn't happen again just after with random sleep
# there's very likely a better way to do this, idk
while not os.path.isdir(outputfolder):
try:
os.mkdir(outputfolder)
except:
time.sleep(randint(0, 4))
# output file path
infile_noextension: str = os.path.splitext(os.path.basename(inputfile))[0]
infile_extension: str = os.path.splitext(os.path.basename(inputfile))[1]
match infile_extension:
case ".flac" | ".wav":
print("Working on", inputfile)
outputfile: str = os.path.join(outputfolder, infile_noextension + ".flac")
# direct conversion start
loudnorm(inputfile=inputfile, outputfile=outputfile)
print("Completed", inputfile)
case ".mp3" | ".m4a" | ".aac" | ".opus":
print("Working on", inputfile)
outputfile: str = os.path.join(outputfolder, infile_noextension + ".opus")
# conversion is started within the ffmpeg_to_wav function
ffmpeg_to_wav(inputfile=inputfile, outputfile=outputfile)
print("Completed", inputfile)
case _:
print(
inputfile,
"does not use a known extension. This error shouldn't be happening actually",
)
return
if __name__ == "__main__":
"""
Handle arguments and other details for interactive usage
"""
# start time of program
starttime = time.time()
parser = argparse.ArgumentParser(description="")
# Input directory
parser.add_argument(
"-i", "--input-dir", required=True, type=str, help="Input source directory"
)
# number of cpus/threads to use, defaults to all available
parser.add_argument(
"-c",
"--cpu-count",
required=False,
type=int,
help="Number of cpu cores",
default=multiprocessing.cpu_count(),
)
# in case you wanted to rerun the conversion for everything
parser.add_argument(
"-r",
"--reset",
required=False,
action="store_true",
help="Rerun conversion for all files",
)
args = parser.parse_args()
srcfolder = args.input_dir
cpu = args.cpu_count
reset = args.reset
# file where last run timestamp is stored
timefile = os.path.join(srcfolder, "run.time")
# list of non-linear normalizations
nonlinear_all: Optional[list[Any]] = []
# get time of previous run
if reset:
timeprev = 0
elif os.path.isfile(timefile):
with open(timefile, "r") as file:
timeprev = file.read()
else:
timeprev = 0
musicfiles: list[str] = []
for root, dirs, files in os.walk(srcfolder):
# ignore the "normalized" subfolder
dirs[:] = [d for d in dirs if d not in ["normalized"]]
for file in files:
if file.endswith(musicfile_extensions):
filepath = os.path.join(root, file)
# only file newer than the last run are added
if os.path.getmtime(filepath) >= float(timeprev):
musicfiles.append(os.path.join(root, file))
with Pool(cpu) as p:
nonlinear_all: Optional[list[Any]] = p.map(main, musicfiles)
# write this run's time into file
with open(timefile, "w") as file:
file.write(str(starttime))