From ba44bac43517ceb777f8638c5804aa7d5e22119f Mon Sep 17 00:00:00 2001
From: Jan Buethe <jbuethe@amazon.de>
Date: Sat, 22 Jul 2023 13:10:54 -0700
Subject: [PATCH] added testsuite

---
 dnn/torch/testsuite/README.md                 |  46 +++
 .../testsuite/examples/lpcnet_c_example.yml   |   6 +
 .../examples/lpcnet_c_plc_example.yml         |   5 +
 .../examples/lpcnet_torch_example.yml         |   5 +
 dnn/torch/testsuite/requirements.txt          |  10 +
 dnn/torch/testsuite/run_test.py               | 353 ++++++++++++++++++
 dnn/torch/testsuite/utils/__init__.py         |   0
 dnn/torch/testsuite/utils/files.py            |  25 ++
 dnn/torch/testsuite/utils/pesq.py             |  14 +
 dnn/torch/testsuite/utils/pitch.py            |  32 ++
 dnn/torch/testsuite/utils/warpq.py            | 177 +++++++++
 11 files changed, 673 insertions(+)
 create mode 100644 dnn/torch/testsuite/README.md
 create mode 100644 dnn/torch/testsuite/examples/lpcnet_c_example.yml
 create mode 100644 dnn/torch/testsuite/examples/lpcnet_c_plc_example.yml
 create mode 100644 dnn/torch/testsuite/examples/lpcnet_torch_example.yml
 create mode 100644 dnn/torch/testsuite/requirements.txt
 create mode 100644 dnn/torch/testsuite/run_test.py
 create mode 100644 dnn/torch/testsuite/utils/__init__.py
 create mode 100644 dnn/torch/testsuite/utils/files.py
 create mode 100644 dnn/torch/testsuite/utils/pesq.py
 create mode 100644 dnn/torch/testsuite/utils/pitch.py
 create mode 100644 dnn/torch/testsuite/utils/warpq.py

diff --git a/dnn/torch/testsuite/README.md b/dnn/torch/testsuite/README.md
new file mode 100644
index 000000000..cc76965ec
--- /dev/null
+++ b/dnn/torch/testsuite/README.md
@@ -0,0 +1,46 @@
+# lpcnet-testsuite
+
+## setup
+The test script is written for Linux only. It requires sox to be installed and available.
+
+Setup is done as usual via
+
+```
+pip install -r requirements.txt
+```
+
+The test scrip run_warpq_test.py requires a setup file in yaml format, which specifies how
+to generate a wave file OUTPUT from a wave file INPUT sampled resampled to the specified
+sampling rate as a list of shell commands. This makes it easy to test other neural vocoders
+with it as well. Two examples are given in examples. INPUT and OUTPUT will be replaced by using
+the string.format(INPUT=input,OUTPUT=output) method.
+
+Here is one example:
+
+```
+test: "LPCNet reference test"
+processing:
+  - "sox {INPUT} {INPUT}.raw"
+  - "/local/code/LPCNet/lpcnet_demo -features {INPUT}.raw {INPUT}.features.f32"
+  - "/local/code/LPCNet/lpcnet_demo -synthesis {INPUT}.features.f32 {INPUT}.decoded.raw"
+  - "sox -r 16000 -L -e signed-integer -b 16 -c 1 {INPUT}.decoded.raw {OUTPUT}"
+```
+
+The structure of the output folder is as follows:
+
+```
+output_folder
++-- html
+    +-- index.html
+    +-- items
++-- processing
++-- setup.yml
++-- stats.txt
++-- scores.txt
+```
+
+scores.txt contains the WARP-Q scores in descending order (best to worse)
+stats.txt contains mean values over all, the 10 best and the 10 worst items
+setup.yml contains all information to repeat the run
+htms contains a self-contained website displaying the 10 best and 10 worst items
+processing contains processing output
\ No newline at end of file
diff --git a/dnn/torch/testsuite/examples/lpcnet_c_example.yml b/dnn/torch/testsuite/examples/lpcnet_c_example.yml
new file mode 100644
index 000000000..2858309cb
--- /dev/null
+++ b/dnn/torch/testsuite/examples/lpcnet_c_example.yml
@@ -0,0 +1,6 @@
+test: "LPCNet reference test"
+processing:
+  - "sox {INPUT} {INPUT}.raw"
+  - "/local/code/LPCNet/lpcnet_demo -features {INPUT}.raw {INPUT}.features.f32"
+  - "/local/code/LPCNet/lpcnet_demo -synthesis {INPUT}.features.f32 {INPUT}.decoded.raw"
+  - "sox -r 16000 -L -e signed-integer -b 16 -c 1 {INPUT}.decoded.raw {OUTPUT} trim 0.015"
\ No newline at end of file
diff --git a/dnn/torch/testsuite/examples/lpcnet_c_plc_example.yml b/dnn/torch/testsuite/examples/lpcnet_c_plc_example.yml
new file mode 100644
index 000000000..b97b26d1d
--- /dev/null
+++ b/dnn/torch/testsuite/examples/lpcnet_c_plc_example.yml
@@ -0,0 +1,5 @@
+test: "LPCNet reference test"
+processing:
+  - "sox {INPUT} {INPUT}.raw"
+  - "/local/code/LPCNet/lpcnet_demo -plc_file causal {PLCFILE} {INPUT}.raw {INPUT}.decoded.raw"
+  - "sox -r 16000 -L -e signed-integer -b 16 -c 1 {INPUT}.decoded.raw {OUTPUT}"
\ No newline at end of file
diff --git a/dnn/torch/testsuite/examples/lpcnet_torch_example.yml b/dnn/torch/testsuite/examples/lpcnet_torch_example.yml
new file mode 100644
index 000000000..631cbfad6
--- /dev/null
+++ b/dnn/torch/testsuite/examples/lpcnet_torch_example.yml
@@ -0,0 +1,5 @@
+test: "no noise test"
+processing:
+  - "sox {INPUT} {INPUT}.raw"
+  - "/home/ubuntu/bin/lpcnet_dump_data_v2 -test {INPUT}.raw {INPUT}.features.f32"
+  - "/home/ubuntu/opt/miniconda3/envs/torch/bin/python /local/code/lpcnext/test_lpcnet.py {INPUT}.features.f32 /local/experiments/noise_augmentation/output/lpcnet_384_2/checkpoints/checkpoint_epoch_20.pth {OUTPUT}"
\ No newline at end of file
diff --git a/dnn/torch/testsuite/requirements.txt b/dnn/torch/testsuite/requirements.txt
new file mode 100644
index 000000000..09cc2ab91
--- /dev/null
+++ b/dnn/torch/testsuite/requirements.txt
@@ -0,0 +1,10 @@
+scipy
+librosa
+numpy
+scikit-image
+pyvad
+speechpy
+soundfile
+pyyaml
+pesq
+AMFM_decompy
\ No newline at end of file
diff --git a/dnn/torch/testsuite/run_test.py b/dnn/torch/testsuite/run_test.py
new file mode 100644
index 000000000..a397a464b
--- /dev/null
+++ b/dnn/torch/testsuite/run_test.py
@@ -0,0 +1,353 @@
+
+from genericpath import isfile
+import os
+import multiprocessing
+import random
+import subprocess
+import argparse
+import shutil
+
+import yaml
+
+from utils.files import get_wave_file_list
+from utils.warpq import compute_WAPRQ
+from utils.pesq import compute_PESQ
+from utils.pitch import compute_pitch_error
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument('setup', type=str, help='setup yaml specifying end to end processing with model under test')
+parser.add_argument('input_folder', type=str, help='input folder path')
+parser.add_argument('output_folder', type=str, help='output folder path')
+parser.add_argument('--num-testitems', type=int, help="number of testitems to be processed (default 100)", default=100)
+parser.add_argument('--seed', type=int, help='seed for random item selection', default=None)
+parser.add_argument('--fs', type=int, help="sampling rate at which input is presented as wave file (defaults to 16000)", default=16000)
+parser.add_argument('--num-workers', type=int, help="number of subprocesses to be used (default=4)", default=4)
+parser.add_argument('--plc-suffix', type=str, default="_is_lost.txt", help="suffix of plc error pattern file: only relevant if command chain uses PLCFILE (default=_is_lost.txt)")
+parser.add_argument('--metrics', type=str, default='warpq', help='comma separated string of metrics, supported: {{"warpq", "pesq"}}, default="warpq"')
+
+
+def check_for_sox_in_path():
+    r = subprocess.run("sox -h", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+    return r.returncode == 0
+
+
+def run_save_sh(command, verbose=False):
+
+    if verbose:
+        print(f"[run_save_sh] running command {command}...")
+
+    r = subprocess.run(command, shell=True)
+    if r.returncode != 0:
+        raise RuntimeError(f"command '{command}' failed with exit code {r.returncode}")
+
+
+def run_processing_chain(input_path, output_path, model_commands, fs, metrics={'warpq'}, plc_suffix="_is_lost.txt", verbose=False):
+
+    # prepare model input
+    model_input = output_path + ".resamp.wav"
+    run_save_sh(f"sox {input_path} -r {fs} {model_input}", verbose=verbose)
+
+    plcfile = os.path.splitext(input_path)[0] + plc_suffix
+    if os.path.isfile(plcfile):
+        run_save_sh(f"cp {plcfile} {os.path.dirname(output_path)}")
+
+    # generate model output
+    for command in model_commands:
+        run_save_sh(command.format(INPUT=model_input, OUTPUT=output_path, PLCFILE=plcfile), verbose=verbose)
+
+    scores = dict()
+    cache = dict()
+    for metric in metrics:
+        if metric == 'warpq':
+            # run warpq
+            score = compute_WAPRQ(input_path, output_path, sr=fs)
+        elif metric == 'pesq':
+            # run pesq
+            score = compute_PESQ(input_path, output_path, fs=fs)
+        elif metric == 'pitch_error':
+            if metric in cache:
+                score = cache[metric]
+            else:
+                rval = compute_pitch_error(input_path, output_path, fs=fs)
+                score = rval[metric]
+                cache['voicing_error'] = rval['voicing_error']
+        elif metric == 'voicing_error':
+            if metric in cache:
+                score = cache[metric]
+            else:
+                rval = compute_pitch_error(input_path, output_path, fs=fs)
+                score = rval[metric]
+                cache['pitch_error'] = rval['pitch_error']
+        else:
+            ValueError(f'error: unknown metric {metric}')
+
+        scores[metric] = score
+
+    return (output_path, scores)
+
+
+def get_output_path(root_folder, input, output_folder):
+
+    input_relpath = os.path.relpath(input, root_folder)
+
+    os.makedirs(os.path.join(output_folder, 'processing', os.path.dirname(input_relpath)), exist_ok=True)
+
+    output_path = os.path.join(output_folder, 'processing', input_relpath + '.output.wav')
+
+    return output_path
+
+
+def add_audio_table(f, html_folder, results, title, metric):
+
+    item_folder = os.path.join(html_folder, 'items')
+    os.makedirs(item_folder, exist_ok=True)
+
+    # table with results
+    f.write(f"""
+            <div>
+            <h2> {title} </h2>
+            <table>
+            <tr>
+                <th> Rank   </th>
+                <th> Name   </th>
+                <th> {metric.upper()} </th>
+                <th> Audio (out)  </th>
+                <th> Audio (orig)  </th>
+            </tr>
+            """)
+
+    for i, r in enumerate(results):
+        item, score = r
+        item_name = os.path.basename(item)
+        new_item_path = os.path.join(item_folder, item_name)
+        shutil.copyfile(item, new_item_path)
+        shutil.copyfile(item + '.resamp.wav', os.path.join(item_folder, item_name + '.orig.wav'))
+
+        f.write(f"""
+                <tr>
+                    <td> {i + 1} </td>
+                    <td> {item_name.split('.')[0]} </td>
+                    <td> {score:.3f} </td>
+                    <td>
+                        <audio controls>
+                            <source src="items/{item_name}">
+                        </audio>
+                    </td>
+                    <td>
+                        <audio controls>
+                            <source src="items/{item_name + '.orig.wav'}">
+                        </audio>
+                    </td>
+                </tr>
+                """)
+
+    # footer
+    f.write("""
+            </table>
+            </div>
+            """)
+
+
+def create_html(output_folder, results, title, metric):
+
+    html_folder = output_folder
+    items_folder = os.path.join(html_folder, 'items')
+    os.makedirs(html_folder, exist_ok=True)
+    os.makedirs(items_folder, exist_ok=True)
+
+    with open(os.path.join(html_folder, 'index.html'), 'w') as f:
+        # header and title
+        f.write(f"""
+                <!DOCTYPE html>
+                <html lang="en">
+                <head>
+                    <meta charset="utf-8">
+                    <title>{title}</title>
+                    <style>
+                        article {{
+                            align-items: flex-start;
+                            display: flex;
+                            flex-wrap: wrap;
+                            gap: 4em;
+                        }}
+                        html {{
+                            box-sizing: border-box;
+                            font-family: "Amazon Ember", "Source Sans", "Verdana", "Calibri", sans-serif;
+                            padding: 2em;
+                        }}
+                        td {{
+                            padding: 3px 7px;
+                            text-align: center;
+                        }}
+                        td:first-child {{
+                            text-align: end;
+                        }}
+                        th {{
+                            background: #ff9900;
+                            color: #000;
+                            font-size: 1.2em;
+                            padding: 7px 7px;
+                        }}
+                    </style>
+                </head>
+                </body>
+                <h1>{title}</h1>
+                <article>
+                """)
+
+        # top 20
+        add_audio_table(f, html_folder, results[:-21: -1], "Top 20", metric)
+
+        # 20 around median
+        N = len(results) // 2
+        add_audio_table(f, html_folder, results[N + 10 : N - 10: -1], "Median 20", metric)
+
+        # flop 20
+        add_audio_table(f, html_folder, results[:20], "Flop 20", metric)
+
+        # footer
+        f.write("""
+                </article>
+                </body>
+                </html>
+                """)
+
+metric_sorting_signs = {
+    'warpq'         : -1,
+    'pesq'          : 1,
+    'pitch_error'   : -1,
+    'voicing_error' : -1
+}
+
+def is_valid_result(data, metrics):
+    if not isinstance(data, dict):
+        return False
+
+    for metric in metrics:
+        if not metric in data:
+            return False
+
+    return True
+
+
+def evaluate_results(output_folder, results, metric):
+
+    results = sorted(results, key=lambda x : metric_sorting_signs[metric] * x[1])
+    with open(os.path.join(args.output_folder, f'scores_{metric}.txt'), 'w') as f:
+        for result in results:
+            f.write(f"{os.path.relpath(result[0], args.output_folder)} {result[1]}\n")
+
+
+    # some statistics
+    mean = sum([r[1] for r in results]) / len(results)
+    top_mean = sum([r[1] for r in results[-20:]]) / 20
+    bottom_mean = sum([r[1] for r in results[:20]]) / 20
+
+    with open(os.path.join(args.output_folder, f'stats_{metric}.txt'), 'w') as f:
+        f.write(f"mean score: {mean}\n")
+        f.write(f"bottom mean score: {bottom_mean}\n")
+        f.write(f"top mean score: {top_mean}\n")
+
+    print(f"\nmean score: {mean}")
+    print(f"bottom mean score: {bottom_mean}")
+    print(f"top mean score: {top_mean}\n")
+
+    # create output html
+    create_html(os.path.join(output_folder, 'html', metric), results, setup['test'], metric)
+
+if __name__ == "__main__":
+    args = parser.parse_args()
+
+    # check for sox
+    if not check_for_sox_in_path():
+        raise RuntimeError("script requires sox")
+
+
+    # prepare output folder
+    if os.path.exists(args.output_folder):
+        print("warning: output folder exists")
+
+        reply = input('continue? (y/n): ')
+        while reply not in {'y', 'n'}:
+            reply = input('continue? (y/n): ')
+
+        if reply == 'n':
+            os._exit()
+        else:
+            # start with a clean sleight
+            shutil.rmtree(args.output_folder)
+
+    os.makedirs(args.output_folder, exist_ok=True)
+
+    # extract metrics
+    metrics = args.metrics.split(",")
+    for metric in metrics:
+        if not metric in metric_sorting_signs:
+            print(f"unknown metric {metric}")
+            args.usage()
+
+    # read setup
+    print(f"loading {args.setup}...")
+    with open(args.setup, "r") as f:
+        setup = yaml.load(f.read(), yaml.FullLoader)
+
+    model_commands = setup['processing']
+
+    print("\nfound the following model commands:")
+    for command in model_commands:
+        print(command.format(INPUT='input.wav', OUTPUT='output.wav', PLCFILE='input_is_lost.txt'))
+
+    # store setup to output folder
+    setup['input']  = os.path.abspath(args.input_folder)
+    setup['output'] = os.path.abspath(args.output_folder)
+    setup['seed']   = args.seed
+    with open(os.path.join(args.output_folder, 'setup.yml'), 'w') as f:
+        yaml.dump(setup, f)
+
+    # get input
+    print(f"\nCollecting audio files from {args.input_folder}...")
+    file_list = get_wave_file_list(args.input_folder, check_for_features=False)
+    print(f"...{len(file_list)} files found\n")
+
+    # sample from file list
+    file_list = sorted(file_list)
+    random.seed(args.seed)
+    random.shuffle(file_list)
+    num_testitems = min(args.num_testitems, len(file_list))
+    file_list = file_list[:num_testitems]
+
+
+    print(f"\nlaunching test on {num_testitems} items...")
+    # helper function for parallel processing
+    def func(input_path):
+        output_path = get_output_path(args.input_folder, input_path, args.output_folder)
+
+        try:
+            rval = run_processing_chain(input_path, output_path, model_commands, args.fs, metrics=metrics, plc_suffix=args.plc_suffix, verbose=False)
+        except:
+            rval = (input_path, -1)
+
+        return rval
+
+    with multiprocessing.Pool(args.num_workers) as p:
+        results = p.map(func, file_list)
+
+    results_dict = dict()
+    for name, values in results:
+        if is_valid_result(values, metrics):
+            results_dict[name] = values
+
+    print(results_dict)
+
+    # evaluating results
+    num_failures = num_testitems - len(results_dict)
+    print(f"\nprocessing of {num_failures} items failed\n")
+
+    for metric in metrics:
+        print(metric)
+        evaluate_results(
+            args.output_folder,
+            [(name, value[metric]) for name, value in results_dict.items()],
+            metric
+        )
\ No newline at end of file
diff --git a/dnn/torch/testsuite/utils/__init__.py b/dnn/torch/testsuite/utils/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/dnn/torch/testsuite/utils/files.py b/dnn/torch/testsuite/utils/files.py
new file mode 100644
index 000000000..8f6e97087
--- /dev/null
+++ b/dnn/torch/testsuite/utils/files.py
@@ -0,0 +1,25 @@
+import os
+
+
+def get_wave_file_list(parent_folder, extensions=[".wav", ".flac"], check_for_features=False):
+    """ traverses subfolders of parent_folder in search for files that match the given extension """
+
+    file_list = []
+
+    for root, dirs, files in os.walk(parent_folder, topdown=True):
+
+        for file in files:
+
+            stem, ext = os.path.splitext(file)
+
+            #check for extension
+            if not ext in extensions:
+                continue
+
+            # check if feature file exists
+            if check_for_features and not os.path.isfile(os.path.join(root, stem + "_features.f32")):
+                continue
+
+            file_list.append(os.path.join(root, file))
+
+    return file_list
\ No newline at end of file
diff --git a/dnn/torch/testsuite/utils/pesq.py b/dnn/torch/testsuite/utils/pesq.py
new file mode 100644
index 000000000..54cfff84e
--- /dev/null
+++ b/dnn/torch/testsuite/utils/pesq.py
@@ -0,0 +1,14 @@
+import pesq
+import librosa
+
+def compute_PESQ(ref, test, fs=16000):
+
+    if not ref.endswith('.wav') or not test.endswith('.wav'):
+        raise ValueError('error: expecting .wav as file extension')
+
+    ref_item, _ = librosa.load(ref, sr=fs)
+    test_item, _ = librosa.load(test, sr=fs)
+
+    score = pesq.pesq(fs, ref_item, test_item)
+
+    return score
\ No newline at end of file
diff --git a/dnn/torch/testsuite/utils/pitch.py b/dnn/torch/testsuite/utils/pitch.py
new file mode 100644
index 000000000..50edda274
--- /dev/null
+++ b/dnn/torch/testsuite/utils/pitch.py
@@ -0,0 +1,32 @@
+import numpy as np
+from scipy.io import wavfile
+import amfm_decompy.pYAAPT as pYAAPT
+import amfm_decompy.basic_tools as basic
+
+def get_voicing_info(x, sr=16000):
+
+    signal = basic.SignalObj(x, sr)
+    pitch = pYAAPT.yaapt(signal, **{'frame_length' : 20.0, 'tda_frame_length' : 20.0})
+
+    pitch_values = pitch.samp_values
+    voiced_flags = pitch.vuv.astype('float')
+
+    return pitch_values, voiced_flags
+
+def compute_pitch_error(ref_path, test_path, fs=16000):
+    fs_orig, x_orig = wavfile.read(ref_path)
+    fs_test, x_test = wavfile.read(test_path)
+
+    min_length = min(len(x_orig), len(x_test))
+    x_orig = x_orig[:min_length]
+    x_test = x_test[:min_length]
+
+    assert fs_orig == fs_test == fs
+
+    pitch_contour_orig, voicing_orig = get_voicing_info(x_orig.astype(np.float32))
+    pitch_contour_test, voicing_test = get_voicing_info(x_test.astype(np.float32))
+
+    return {
+        'pitch_error' : np.mean(np.abs(pitch_contour_orig - pitch_contour_test)).item(),
+        'voicing_error' : np.sum(np.abs(voicing_orig - voicing_test)).item() / len(voicing_orig)
+        }
\ No newline at end of file
diff --git a/dnn/torch/testsuite/utils/warpq.py b/dnn/torch/testsuite/utils/warpq.py
new file mode 100644
index 000000000..4d5b7877e
--- /dev/null
+++ b/dnn/torch/testsuite/utils/warpq.py
@@ -0,0 +1,177 @@
+
+"""
+WARP-Q: Quality Prediction For Generative Neural Speech Codecs
+
+This is the WARP-Q version used in the ICASSP 2021 Paper:
+
+W. A. Jassim, J. Skoglund, M. Chinen, and A. Hines, “WARP-Q: Quality prediction
+for generative neural speech codecs,” paper accepted for presentation at the 2021 IEEE
+International Conference on Acoustics, Speech and Signal Processing (ICASSP 2021).
+Date of acceptance: 30 Jan 2021. Preprint: https://arxiv.org/pdf/2102.10449
+
+Run using python 3.x and include these package dependencies in your virtual environment:
+    - pandas
+    - librosa
+    - numpy
+    - pyvad
+    - skimage
+    - speechpy
+    - soundfile
+    - scipy (optional)
+    - seaborn (optional, for plotting only)
+    - multiprocessing (optional, for parallel computing mode only)
+    - joblib (optional, for parallel computing mode only)
+
+Input:
+    - The main_test function calls a csv file that contains paths of audio files.
+    - The csv file cosists of four columns:
+        - Ref_Wave: reference speech
+        - Test_Wave: test speech
+        - MOS: subjective score (optinal, for plotting only)
+        - Codec: type of speech codec for the test speech (optinal, for plotting only)
+
+Output:
+    - Code will compute the WARP-Q quality scores between Ref_Wave and Test_Wave,
+    and will store the obrained results in a new column in the same csv file.
+
+
+Releases:
+
+Warning: While this code has been tested and commented giving invalid input
+files may cause unexpected results and will not be caught by robust exception
+handling or validation checking. It will just fail or give you the wrong answer.
+
+In this simple and basic demo, we compute WARP-Q scores for 8 speech samples only.
+More data should should be provided to have better score distributions.
+
+
+(c) Dr Wissam Jassim
+    University College Dublin
+    wissam.a.jassim@gmail.com
+    wissam.jassim@ucd.ie
+    November 28, 2020
+
+"""
+
+# Load libraries
+import librosa, librosa.core, librosa.display
+import numpy as np
+from pyvad import vad
+from skimage.util.shape import view_as_windows
+import speechpy
+import soundfile as sf
+
+################################ WARP-Q #######################################
+def compute_WAPRQ(ref_path,test_path,sr=16000,n_mfcc=12,fmax=5000,patch_size=0.4,
+                  sigma=np.array([[1,1],[3,2],[1,3]])):
+
+    # Inputs:
+    # refPath: path of reference speech
+    # disPath: path pf degraded speech
+    # sr: sampling frequency, Hz
+    # n_mfcc: number of MFCCs
+    # fmax: cutoff frequency
+    # patch_size: size of each patch in s
+    # sigma: step size conditon for DTW
+
+    # Output:
+    # WARP-Q quality score between refPath and disPath
+
+
+    ####################### Load speech files #################################
+    # Load Ref Speech
+    if ref_path[-4:] == '.wav':
+        speech_Ref, sr_Ref = librosa.load(ref_path,sr=sr)
+    else:
+        if ref_path[-4:] == '.SRC': #For ITUT database if applicable
+            speech_Ref, sr_Ref  = sf.read(ref_path, format='RAW', channels=1, samplerate=16000,
+                           subtype='PCM_16', endian='LITTLE')
+            if sr_Ref != sr:
+                speech_Ref = librosa.resample(speech_Ref, sr_Ref, sr)
+                sr_Ref = sr
+
+    # Load Coded Speech
+    if test_path[-4:] == '.wav':
+        speech_Coded, sr_Coded = librosa.load(test_path,sr=sr)
+    else:
+        if test_path[-4:] == '.OUT': #For ITUT database if applicable
+            speech_Coded, sr_Coded  = sf.read(test_path, format='RAW', channels=1, samplerate=16000,
+                           subtype='PCM_16', endian='LITTLE')
+            if sr_Coded != sr:
+                speech_Coded = librosa.resample(speech_Coded, sr_Coded, sr)
+                sr_Coded = sr
+
+    if sr_Ref != sr_Coded:
+        raise ValueError("Reference and degraded signals should have same sampling rate!")
+
+    # Make sure amplitudes are in the range of [-1, 1] otherwise clipping to -1 to 1
+    # after resampling (if applicable). We experienced this issue for TCD-VOIP database only
+    speech_Ref[speech_Ref>1]=1.0
+    speech_Ref[speech_Ref<-1]=-1.0
+
+    speech_Coded[speech_Coded>1]=1.0
+    speech_Coded[speech_Coded<-1]=-1.0
+
+    ###########################################################################
+
+    win_length = int(0.032*sr) #32 ms frame
+    hop_length = int(0.004*sr) #4 ms overlap
+    #hop_length = int(0.016*sr)
+
+    n_fft = 2*win_length
+    lifter = 3
+
+    # DTW Parameters
+    Metric = 'euclidean'
+
+    # VAD Parameters
+    hop_size_vad = 30
+    sr_vad = sr
+    aggresive = 0
+
+    # VAD for Ref speech
+    vact1 = vad(speech_Ref, sr, fs_vad = sr_vad, hop_length = hop_size_vad, vad_mode=aggresive)
+    speech_Ref_vad = speech_Ref[vact1==1]
+
+    # VAD for Coded speech
+    vact2 = vad(speech_Coded, sr, fs_vad = sr_vad, hop_length = hop_size_vad, vad_mode=aggresive)
+    speech_Coded_vad = speech_Coded[vact2==1]
+
+    # Compute MFCC features for the two signals
+
+    mfcc_Ref = librosa.feature.mfcc(y=speech_Ref_vad,sr=sr,n_mfcc=n_mfcc,fmax=fmax,
+                                    n_fft=n_fft,win_length=win_length,hop_length=hop_length,lifter=lifter)
+    mfcc_Coded = librosa.feature.mfcc(y=speech_Coded_vad,sr=sr,n_mfcc=n_mfcc,fmax=fmax,
+                                    n_fft=n_fft,win_length=win_length,hop_length=hop_length,lifter=lifter)
+
+    # Feature Normalisation using CMVNW method
+    mfcc_Ref = speechpy.processing.cmvnw(mfcc_Ref.T,win_size=201,variance_normalization=True).T
+    mfcc_Coded = speechpy.processing.cmvnw(mfcc_Coded.T,win_size=201,variance_normalization=True).T
+
+    # Divid MFCC features of Coded speech into patches
+    cols = int(patch_size/(hop_length/sr))
+    window_shape = (np.size(mfcc_Ref,0), cols)
+    step  = int(cols/2)
+
+    mfcc_Coded_patch = view_as_windows(mfcc_Coded, window_shape, step)
+
+    Acc =[]
+    band_rad = 0.25
+    weights_mul=np.array([1, 1, 1])
+
+    # Compute alignment cose between each patch and Ref MFCC
+    for i in range(mfcc_Coded_patch.shape[1]):
+
+        patch = mfcc_Coded_patch[0][i]
+
+        D, P = librosa.sequence.dtw(X=patch, Y=mfcc_Ref, metric=Metric,
+                                    step_sizes_sigma=sigma, weights_mul=weights_mul,
+                                    band_rad=band_rad, subseq=True, backtrack=True)
+
+        P_librosa = P[::-1, :]
+        b_ast = P_librosa[-1, 1]
+
+        Acc.append(D[-1, b_ast] / D.shape[0])
+
+    # Final score
+    return np.median(Acc).item()
-- 
GitLab