[DEV] encode in 16k personal corpus

This commit is contained in:
Edouard DUPIN 2019-01-27 21:20:50 +01:00
parent d64ac95dd9
commit d8dcf70f1b
4 changed files with 498 additions and 1 deletions

269
debug.py Normal file
View File

@ -0,0 +1,269 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
##
## @author Edouard DUPIN
##
## @copyright 2012, Edouard DUPIN, all right reserved
##
## @license MPL v2.0 (see license file)
##
import os
import threading
import re
debug_level=3
debug_color=False
color_default= ""
color_red = ""
color_green = ""
color_yellow = ""
color_blue = ""
color_purple = ""
color_cyan = ""
debug_lock = threading.Lock()
##
## @brief Set log level of the console log system
## @param[in] id (int) Value of the log level:
## 0: None
## 1: error
## 2: warning
## 3: info
## 4: debug
## 5: verbose
## 6: extreme_verbose
##
def set_level(id):
global debug_level
debug_level = id
#print "SetDebug level at " + str(debug_level)
##
## @brief Get the current debug leval
## @return The value of the log level. Show: @ref set_level
##
def get_level():
global debug_level
return debug_level
##
## @brief Enable color of the console Log system
##
def enable_color():
global debug_color
debug_color = True
global color_default
color_default= "\033[00m"
global color_red
color_red = "\033[31m"
global color_green
color_green = "\033[32m"
global color_yellow
color_yellow = "\033[33m"
global color_blue
color_blue = "\033[01;34m"
global color_purple
color_purple = "\033[35m"
global color_cyan
color_cyan = "\033[36m"
##
## @brief Disable color of the console Log system
##
def disable_color():
global debug_color
debug_color = True
global color_default
color_default= ""
global color_red
color_red = ""
global color_green
color_green = ""
global color_yellow
color_yellow = ""
global color_blue
color_blue = ""
global color_purple
color_purple = ""
global color_cyan
color_cyan = ""
##
## @brief Print a extreme verbose log
## @param[in] input (string) Value to print if level is enough
## @param[in] force (bool) force display (no check of log level)
##
def extreme_verbose(input, force=False):
global debug_lock
global debug_level
if debug_level >= 6 \
or force == True:
debug_lock.acquire()
print(color_blue + input + color_default)
debug_lock.release()
##
## @brief Print a verbose log
## @param[in] input (string) Value to print if level is enough
## @param[in] force (bool) force display (no check of log level)
##
def verbose(input, force=False):
global debug_lock
global debug_level
if debug_level >= 5 \
or force == True:
debug_lock.acquire()
print(color_blue + input + color_default)
debug_lock.release()
##
## @brief Print a log (every time
## @param[in] input (string) Value to print if level is enough
## @param[in] force (bool) force display (no check of log level)
##
def display(input, force=False):
global debug_lock
debug_lock.acquire()
print(color_blue + input + color_default)
debug_lock.release()
##
## @brief Print a debug log
## @param[in] input (string) Value to print if level is enough
## @param[in] force (bool) force display (no check of log level)
##
def debug(input, force=False):
global debug_lock
global debug_level
if debug_level >= 4 \
or force == True:
debug_lock.acquire()
print(color_green + input + color_default)
debug_lock.release()
##
## @brief Print an info log
## @param[in] input (string) Value to print if level is enough
## @param[in] force (bool) force display (no check of log level)
##
def info(input, force=False):
global debug_lock
global debug_level
if debug_level >= 3 \
or force == True:
debug_lock.acquire()
print(input + color_default)
debug_lock.release()
##
## @brief Print a warning log
## @param[in] input (string) Value to print if level is enough
## @param[in] force (bool) force display (no check of log level)
##
def warning(input, force=False):
global debug_lock
global debug_level
if debug_level >= 2 \
or force == True:
debug_lock.acquire()
print(color_purple + "[WARNING] " + input + color_default)
debug_lock.release()
##
## @brief Print a todo log
## @param[in] input (string) Value to print if level is enough
## @param[in] force (bool) force display (no check of log level)
##
def todo(input, force=False):
global debug_lock
global debug_level
if debug_level >= 3 \
or force == True:
debug_lock.acquire()
print(color_purple + "[TODO] " + input + color_default)
debug_lock.release()
##
## @brief Print an error log
## @param[in] input (string) Value to print if level is enough
## @param[in] thread_id (int) Current thead ID of the builder thread
## @param[in] force (bool) force display (no check of log level)
## @param[in] crash (bool) build error has appear ==> request stop of all builds
##
def error(input, thread_id=-1, force=False, crash=True):
global debug_lock
global debug_level
if debug_level >= 1 \
or force == True:
debug_lock.acquire()
print(color_red + "[ERROR] " + input + color_default)
debug_lock.release()
if crash == True:
exit(-1)
#os_exit(-1)
#raise "error happend"
##
## @brief Print a log for a specific element action like generateing .so or binary ...
## @param[in] type (string) type of action. Like: "copy file", "StaticLib", "Prebuild", "Library" ...
## @param[in] lib (string) Name of the library/binary/package that action is done
## @param[in] dir (string) build direction. ex: "<==", "==>" ...
## @param[in] name (string) Destination of the data
## @param[in] force (bool) force display (no check of log level)
##
def print_element(type, lib, dir, name, force=False):
global debug_lock
global debug_level
if debug_level >= 3 \
or force == True:
debug_lock.acquire()
print(color_cyan + type + color_default + " : " + color_yellow + lib + color_default + " " + dir + " " + color_blue + name + color_default)
debug_lock.release()
##
## @brief Print a compilation return (output)
## @param[in] my_string (string) Std-error/std-info that is generate by the build system
##
def print_compilator(my_string):
global debug_color
global debug_lock
if debug_color == True:
my_string = my_string.replace('\\n', '\n')
my_string = my_string.replace('\\t', '\t')
my_string = my_string.replace('error:', color_red+'error:'+color_default)
my_string = my_string.replace('warning:', color_purple+'warning:'+color_default)
my_string = my_string.replace('note:', color_green+'note:'+color_default)
my_string = re.sub(r'([/\w_-]+\.\w+):', r'-COLORIN-\1-COLOROUT-:', my_string)
my_string = my_string.replace('-COLORIN-', color_yellow)
my_string = my_string.replace('-COLOROUT-', color_default)
debug_lock.acquire()
print(my_string)
debug_lock.release()
##
## @brief Get the list of default color
## @return A map with keys: "default","red","green","yellow","blue","purple","cyan"
##
def get_color_set() :
global color_default
global color_red
global color_green
global color_yellow
global color_blue
global color_purple
global color_cyan
return {
"default": color_default,
"red": color_red,
"green": color_green,
"yellow": color_yellow,
"blue": color_blue,
"purple": color_purple,
"cyan": color_cyan,
}

File diff suppressed because one or more lines are too long

102
proprocessCorpus.py Executable file
View File

@ -0,0 +1,102 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
##
## @author Edouard DUPIN
##
## @license MPL v2.0 (see license file)
##
import os
import tools
import debug
import argparse
import math
import json
import resampy
import numpy as np
import scipy.io.wavfile as wavfile
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", help="Input directory",
default="")
parser.add_argument("-o", "--output", help="Output directory",
default="out")
parser.add_argument("-v", "--verbose", help="display all LOGS",
default=False,
action='store_true')
args = parser.parse_args()
debug.info("***********************************")
# ---------------------------------------------
# -- check input
# ---------------------------------------------
if args.verbose == True:
debug.set_level(6);
if args.input == None \
or args.input == "" :
debug.error("must set an input directory")
tools.create_directory(args.output)
debug.info("==================================================================================================")
debug.info("== Preprocess corpus data: " + args.input + " to " + args.output)
debug.info("==================================================================================================")
debug.info("Get list of corpus files:")
audio_corpus_element = tools.get_list_of_file_in_path(args.input, ["*.json"], recursive=True)
debug.info("Corpus count " + str(len(audio_corpus_element)) + " element(s)")
elem_id = 0
for elem in audio_corpus_element:
debug.info("---------------------------[ " + str(elem_id) + " / " + str(len(audio_corpus_element)) + " ]---------------------------------------")
elem_id += 1
debug.info("Element: " + elem)
with open(elem) as file:
input_data = json.load(file)
"""{
"user": "Edouard DUPIN",
"value": "bonjour",
"language": "FR_fr",
"time": 3088499332851,
"audio_format": "int16",
"audio_channel": 1,
"audio_sample_rate": 48000,
"audio_filename": "FR_fr_Edouard DUPIN_3088499332851.raw"
}
"""
if "audio_format" not in input_data.keys():
debug.error(" ==> missing field 'audio_format' ...")
if input_data["audio_format"] != "int16":
debug.error(" ==> field 'audio_format' have wrong value: '" + str(input_data["audio_format"]) + "' suported: [int16]")
if "audio_channel" not in input_data.keys():
debug.error(" ==> missing field 'audio_channel' ...")
if input_data["audio_channel"] != 1:
debug.error(" ==> field 'audio_channel' have wrong value: '" + str(input_data["audio_channel"]) + "' suported: [1]")
if "audio_sample_rate" not in input_data.keys():
debug.error(" ==> missing field 'audio_sample_rate' ...")
if input_data["audio_sample_rate"] != 48000:
debug.error(" ==> field 'audio_sample_rate' have wrong value: '" + str(input_data["audio_sample_rate"]) + "' suported: [48000]")
if "audio_filename" not in input_data.keys():
debug.error(" ==> missing field 'audio_filename' ...")
filename = os.path.join(os.path.dirname(elem), input_data["audio_filename"])
if filename[-3:] == "wav":
sample_rate, audio_data = wavfile.read(filename)
else:
debug.error("Not supported file type: '" + str(filename[-3:]) + "' suported: [wav]")
debug.info("Read: " + str(len(audio_data)) + " sample(s)")
audio_16k = resampy.resample(audio_data, 48000, 16000)
debug.info("write: " + str(len(audio_16k)) + " sample(s)")
filename_out = os.path.join(args.output, input_data["audio_filename"])
wavfile.write(filename_out, 16000, audio_16k)
input_data["audio_sample_rate"] = 16000
filename_out_json = os.path.join(args.output, os.path.basename(elem))
with open(filename_out_json, 'w') as outfile:
json.dump(input_data, outfile, indent="\t")
debug.info("Finish")

126
tools.py Normal file
View File

@ -0,0 +1,126 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
##
## @author Edouard DUPIN
##
## @copyright 2012, Edouard DUPIN, all right reserved
##
## @license MPL v2.0 (see license file)
##
import os
import shutil
import errno
import fnmatch
import stat
def create_directory(path):
try:
os.stat(path)
except:
os.makedirs(path)
def create_directory_of_file(file):
path = os.path.dirname(file)
create_directory(path)
def file_write_data(path, data):
print(" write file: " + path)
create_directory_of_file(path)
file = open(path, "w")
file.write(data)
file.close()
return True
def remove_file(path):
if os.path.isfile(path):
os.remove(path)
elif os.path.islink(path):
os.remove(path)
def file_read_data(path, binary=False):
print("path= " + path)
if not os.path.isfile(path):
return ""
if binary == True:
file = open(path, "rb")
else:
file = open(path, "r")
data_file = file.read()
file.close()
return data_file
def copy_file(src, dst):
print("copy " + src + " ==> " + dst)
create_directory_of_file(dst)
shutil.copyfile(src, dst)
def copy_anything(src, dst):
print(" copy anything : '" + str(src) + "'")
print(" to : '" + str(dst) + "'")
if os.path.isdir(os.path.realpath(src)):
tmp_path = os.path.realpath(src)
tmp_rule = ""
else:
tmp_path = os.path.dirname(os.path.realpath(src))
tmp_rule = os.path.basename(src)
for root, dirnames, filenames in os.walk(tmp_path):
deltaRoot = root[len(tmp_path):]
while len(deltaRoot) > 0 \
and ( deltaRoot[0] == '/' \
or deltaRoot[0] == '\\' ):
deltaRoot = deltaRoot[1:]
if deltaRoot != "":
return
tmpList = filenames
if len(tmp_rule) > 0:
tmpList = fnmatch.filter(filenames, tmp_rule)
# Import the module :
for cycleFile in tmpList:
#for cycleFile in filenames:
copy_file(os.path.join(tmp_path, deltaRoot, cycleFile),
os.path.join(dst, deltaRoot, cycleFile))
##
## @brief Get list of all Files in a specific path (with a regex)
## @param[in] path (string) Full path of the machine to search files (start with / or x:)
## @param[in] regex (string) Regular expression to search data
## @param[in] recursive (bool) List file with recursive search
## @param[in] remove_path (string) Data to remove in the path
## @return (list) return files requested
##
def get_list_of_file_in_path(path, filter, recursive = False, remove_path=""):
out = []
if os.path.isdir(os.path.realpath(path)):
tmp_path = os.path.realpath(path)
else:
print("[E] path does not exist : '" + str(path) + "'")
for root, dirnames, filenames in os.walk(tmp_path):
deltaRoot = root[len(tmp_path):]
while len(deltaRoot) > 0 \
and ( deltaRoot[0] == '/' \
or deltaRoot[0] == '\\' ):
deltaRoot = deltaRoot[1:]
if recursive == False \
and deltaRoot != "":
return out
tmpList = []
for elem in filter:
tmpppp = fnmatch.filter(filenames, elem)
for elemmm in tmpppp:
tmpList.append(elemmm)
# Import the module :
for cycleFile in tmpList:
#for cycleFile in filenames:
add_file = os.path.join(tmp_path, deltaRoot, cycleFile)
if len(remove_path) != 0:
if add_file[:len(remove_path)] != remove_path:
print("[E] Request remove start of a path that is not the same: '" + add_file[:len(remove_path)] + "' demand remove of '" + str(remove_path) + "'")
else:
add_file = add_file[len(remove_path)+1:]
out.append(add_file)
return out;