1451 lines
47 KiB
Python
Executable File
1451 lines
47 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# ##### BEGIN GPL LICENSE BLOCK #####
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
#
|
|
# ##### END GPL LICENSE BLOCK #####
|
|
|
|
"""
|
|
This is a utility that activates text to speech in Linux.
|
|
While it could use any system currently it uses the VOSK-API.
|
|
"""
|
|
import easymacro as app
|
|
# See: `hacking.rst` for developer notes.
|
|
|
|
# All built in modules.
|
|
import argparse
|
|
import os
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
# Types.
|
|
from typing import (
|
|
Dict,
|
|
IO,
|
|
List,
|
|
Optional,
|
|
Callable,
|
|
Set,
|
|
Tuple,
|
|
)
|
|
from types import (
|
|
ModuleType,
|
|
)
|
|
|
|
TEMP_COOKIE_NAME = "nerd-dictation.cookie"
|
|
|
|
USER_CONFIG_DIR = "nerd-dictation"
|
|
|
|
USER_CONFIG = "nerd-dictation.py"
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# General Utilities
|
|
#
|
|
|
|
|
|
def run_xdotool(subcommand: str, payload: List[str]) -> None:
|
|
cmd_base = "xdotool"
|
|
cmd = [
|
|
cmd_base,
|
|
subcommand,
|
|
"--clearmodifiers",
|
|
] + payload
|
|
try:
|
|
# ~ print(cmd)
|
|
# ~ type or key
|
|
app_name = app.active.type
|
|
if app_name == 'writer':
|
|
rango = app.selection.goto_end()
|
|
if subcommand == 'type':
|
|
word = payload[-1]
|
|
rango.string = word
|
|
else:
|
|
rango.go_left(count=len(payload), expand=True).string = ''
|
|
elif app_name == 'calc':
|
|
rango = app.selection
|
|
current = rango.value or ''
|
|
if subcommand == 'type':
|
|
word = payload[-1]
|
|
rango.str = current + word
|
|
else:
|
|
current = current[:len(payload)*-1]
|
|
rango.str = current
|
|
# ~ subprocess.check_output(cmd)
|
|
except FileNotFoundError as ex:
|
|
print("Command {!r} not found: {!s}".format(cmd_base, ex))
|
|
sys.exit(1)
|
|
|
|
# Don't catch other kinds of exceptions as they should never happen
|
|
# and can be considered a severe error which doesn't need to be made "user friendly".
|
|
|
|
|
|
def touch(filepath: str, mtime: Optional[float] = None) -> None:
|
|
if os.path.exists(filepath):
|
|
os.utime(filepath, None if mtime is None else (mtime, mtime))
|
|
else:
|
|
open(filepath, "ab").close()
|
|
if mtime is not None:
|
|
try:
|
|
os.utime(filepath, (mtime, mtime))
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def file_mtime_or_none(filepath: str) -> Optional[float]:
|
|
try:
|
|
return os.stat(filepath)[stat.ST_MTIME]
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
|
|
def file_age_in_seconds(filepath: str) -> float:
|
|
"""
|
|
Return the age of the file in seconds.
|
|
"""
|
|
return time.time() - os.stat(filepath)[stat.ST_MTIME]
|
|
|
|
|
|
def file_remove_if_exists(filepath: str) -> bool:
|
|
try:
|
|
os.remove(filepath)
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def file_handle_make_non_blocking(file_handle: IO[bytes]) -> None:
|
|
import fcntl
|
|
|
|
# Get current `file_handle` flags.
|
|
flags = fcntl.fcntl(file_handle.fileno(), fcntl.F_GETFL)
|
|
fcntl.fcntl(file_handle, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
|
|
|
|
def execfile(filepath: str, mod: Optional[ModuleType] = None) -> Optional[ModuleType]:
|
|
"""
|
|
Execute a file path as a Python script.
|
|
"""
|
|
import importlib.util
|
|
|
|
if not os.path.exists(filepath):
|
|
raise FileNotFoundError('File not found "{:s}"'.format(filepath))
|
|
|
|
mod_name = "__main__"
|
|
mod_spec = importlib.util.spec_from_file_location(mod_name, filepath)
|
|
if mod_spec is None:
|
|
raise Exception("Unable to retrieve the module-spec from %r" % filepath)
|
|
if mod is None:
|
|
mod = importlib.util.module_from_spec(mod_spec)
|
|
|
|
# While the module name is not added to `sys.modules`, it's important to temporarily
|
|
# include this so statements such as `sys.modules[cls.__module__].__dict__` behave as expected.
|
|
# See: https://bugs.python.org/issue9499 for details.
|
|
modules = sys.modules
|
|
mod_orig = modules.get(mod_name, None)
|
|
modules[mod_name] = mod
|
|
|
|
# No error suppression, just ensure `sys.modules[mod_name]` is properly restored in the case of an error.
|
|
try:
|
|
# `mypy` doesn't know about this function.
|
|
mod_spec.loader.exec_module(mod) # type: ignore
|
|
finally:
|
|
if mod_orig is None:
|
|
modules.pop(mod_name, None)
|
|
else:
|
|
modules[mod_name] = mod_orig
|
|
|
|
return mod
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Custom Configuration
|
|
#
|
|
|
|
|
|
def calc_user_config_path(rest: Optional[str]) -> str:
|
|
"""
|
|
Path to the user's configuration directory.
|
|
"""
|
|
base = os.environ.get("XDG_CONFIG_HOME")
|
|
if base is None:
|
|
base = os.path.expanduser("~")
|
|
if os.name == "posix":
|
|
base = os.path.join(base, ".config")
|
|
|
|
base = os.path.join(base, USER_CONFIG_DIR)
|
|
if rest:
|
|
base = os.path.join(base, rest)
|
|
return base
|
|
|
|
|
|
def user_config_as_module_or_none(
|
|
config_override: Optional[str],
|
|
) -> Optional[ModuleType]:
|
|
|
|
# Explicitly ask for no configuration.
|
|
if config_override == "":
|
|
return None
|
|
if config_override is None:
|
|
user_config_path = calc_user_config_path(USER_CONFIG)
|
|
if not os.path.exists(user_config_path):
|
|
return None
|
|
else:
|
|
user_config_path = config_override
|
|
# Allow the exception for a custom configuration.
|
|
|
|
try:
|
|
user_config = execfile(user_config_path)
|
|
except Exception as ex:
|
|
sys.stderr.write('Failed to run "{:s}" with error: {:s}\n'.format(user_config_path, str(ex)))
|
|
sys.exit(1)
|
|
|
|
return user_config
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Number Parsing
|
|
#
|
|
# Note this could be extracted into it's own small library.
|
|
|
|
|
|
def from_words_to_digits_setup_once() -> Tuple[
|
|
Dict[str, Tuple[int, int, str, bool]], Set[str], Set[str], Set[str], Set[str]
|
|
]:
|
|
|
|
number_words = {}
|
|
# A set of words that can be used to start numeric expressions.
|
|
valid_digit_words: Set[str] = set()
|
|
|
|
# Singles.
|
|
units = (
|
|
(("zero", ""), ("zeroes", "'s"), ("zeroth", "th")),
|
|
(("one", ""), ("ones", "'s"), ("first", "st")),
|
|
(("two", ""), ("twos", "'s"), ("second", "nd")),
|
|
(("three", ""), ("threes", "'s"), ("third", "rd")),
|
|
(("four", ""), ("fours", "'s"), ("fourth", "th")),
|
|
(("five", ""), ("fives", "'s"), ("fifth", "th")),
|
|
(("six", ""), ("sixes", "'s"), ("sixth", "th")),
|
|
(("seven", ""), ("sevens", "'s"), ("seventh", "th")),
|
|
(("eight", ""), ("eights", "'s"), ("eighth", "th")),
|
|
(("nine", ""), ("nines", "'s"), ("ninth", "th")),
|
|
(("ten", ""), ("tens", "'s"), ("tenth", "th")),
|
|
(("eleven", ""), ("elevens", "'s"), ("eleventh", "th")),
|
|
(("twelve", ""), ("twelves", "'s"), ("twelfth", "th")),
|
|
(("thirteen", ""), ("thirteens", "'s"), ("thirteenth", "th")),
|
|
(("fourteen", ""), ("fourteens", "'s"), ("fourteenth", "th")),
|
|
(("fifteen", ""), ("fifteens", "'s"), ("fifteenth", "th")),
|
|
(("sixteen", ""), ("sixteens", "'s"), ("sixteenth", "th")),
|
|
(("seventeen", ""), ("seventeens", "'s"), ("seventeenth", "th")),
|
|
(("eighteen", ""), ("eighteens", "'s"), ("eighteenth", "th")),
|
|
(("nineteen", ""), ("nineteens", "'s"), ("nineteenth", "th")),
|
|
)
|
|
|
|
# Tens.
|
|
units_tens = (
|
|
(("", ""), ("", ""), ("", "")),
|
|
(("", ""), ("", ""), ("", "")),
|
|
(("twenty", ""), ("twenties", "'s"), ("twentieth", "th")),
|
|
(("thirty", ""), ("thirties", "'s"), ("thirtieth", "th")),
|
|
(("forty", ""), ("forties", "'s"), ("fortieth", "th")),
|
|
(("fifty", ""), ("fifties", "'s"), ("fiftieth", "th")),
|
|
(("sixty", ""), ("sixties", "'s"), ("sixtieth", "th")),
|
|
(("seventy", ""), ("seventies", "'s"), ("seventieth", "th")),
|
|
(("eighty", ""), ("eighties", "'s"), ("eightieth", "th")),
|
|
(("ninety", ""), ("nineties", "'s"), ("ninetieth", "th")),
|
|
)
|
|
|
|
# Larger scales.
|
|
scales = (
|
|
((("hundred", ""), ("hundreds", "s"), ("hundredth", "th")), 2),
|
|
((("thousand", ""), ("thousands", "s"), ("thousandth", "th")), 3),
|
|
((("million", ""), ("millions", "s"), ("millionth", "th")), 6),
|
|
((("billion", ""), ("billions", "s"), ("billionth", "th")), 9),
|
|
((("trillion", ""), ("trillions", "s"), ("trillionth", "th")), 12),
|
|
((("quadrillion", ""), ("quadrillions", "s"), ("quadrillionth", "th")), 15),
|
|
((("quintillion", ""), ("quintillions", "s"), ("quintillionth", "th")), 18),
|
|
((("sextillion", ""), ("sextillions", "s"), ("sextillionth", "th")), 21),
|
|
((("septillion", ""), ("septillions", "s"), ("septillionth", "th")), 24),
|
|
((("octillion", ""), ("octillions", "s"), ("octillionth", "th")), 27),
|
|
((("nonillion", ""), ("nonillions", "s"), ("nonillionth", "th")), 30),
|
|
((("decillion", ""), ("decillions", "s"), ("decillionth", "th")), 33),
|
|
((("undecillion", ""), ("undecillions", "s"), ("undecillionth", "th")), 36),
|
|
((("duodecillion", ""), ("duodecillions", "s"), ("duodecillionth", "th")), 39),
|
|
((("tredecillion", ""), ("tredecillions", "s"), ("tredecillionth", "th")), 42),
|
|
((("quattuordecillion", ""), ("quattuordecillions", "s"), ("quattuordecillionth", "th")), 45),
|
|
((("quindecillion", ""), ("quindecillions", "s"), ("quindecillionth", "th")), 48),
|
|
((("sexdecillion", ""), ("sexdecillions", "s"), ("sexdecillionth", "th")), 51),
|
|
((("septendecillion", ""), ("septendecillions", "s"), ("septendecillionth", "th")), 54),
|
|
((("octodecillion", ""), ("octodecillions", "s"), ("octodecillionth", "th")), 57),
|
|
((("novemdecillion", ""), ("novemdecillions", "s"), ("novemdecillionth", "th")), 60),
|
|
((("vigintillion", ""), ("vigintillions", "s"), ("vigintillionth", "th")), 63),
|
|
((("centillion", ""), ("centillions", "s"), ("centillionth", "th")), 303),
|
|
)
|
|
|
|
# Divisors (not final).
|
|
number_words["and"] = (1, 0, "", False)
|
|
|
|
# Perform our loops and start the swap.
|
|
for idx, word_pairs in enumerate(units):
|
|
for word, suffix in word_pairs:
|
|
number_words[word] = (1, idx, suffix, True)
|
|
for idx, word_pairs in enumerate(units_tens):
|
|
for word, suffix in word_pairs:
|
|
number_words[word] = (1, idx * 10, suffix, True)
|
|
for word_pairs, power in scales:
|
|
for word, suffix in word_pairs:
|
|
number_words[word] = (10**power, 0, suffix, True)
|
|
|
|
# Needed for 'imply_single_unit'
|
|
valid_scale_words = set()
|
|
for word_pairs, _power in scales:
|
|
for word, _suffix in word_pairs:
|
|
valid_scale_words.add(word)
|
|
|
|
valid_unit_words = set()
|
|
for units_iter in (units, units_tens):
|
|
for word_pairs in units_iter:
|
|
for word, _suffix in word_pairs:
|
|
valid_unit_words.add(word)
|
|
|
|
valid_zero_words = {word for (word, _suffix) in units[0]}
|
|
|
|
valid_digit_words.update(number_words.keys())
|
|
valid_digit_words.remove("and")
|
|
valid_digit_words.remove("")
|
|
return (
|
|
number_words,
|
|
valid_digit_words,
|
|
valid_unit_words,
|
|
valid_scale_words,
|
|
valid_zero_words,
|
|
)
|
|
|
|
|
|
# Originally based on: https://ao.gl/how-to-convert-numeric-words-into-numbers-using-python/
|
|
# A module like class can't be instanced.
|
|
class from_words_to_digits:
|
|
|
|
(
|
|
_number_words,
|
|
valid_digit_words,
|
|
valid_unit_words,
|
|
valid_scale_words,
|
|
valid_zero_words,
|
|
) = from_words_to_digits_setup_once()
|
|
|
|
@staticmethod
|
|
def _parse_number_as_whole_value(
|
|
word_list: List[str],
|
|
word_list_len: int,
|
|
word_index: int,
|
|
imply_single_unit: bool = False,
|
|
force_single_units: bool = False,
|
|
) -> Tuple[str, str, int, bool]:
|
|
number_words = from_words_to_digits._number_words
|
|
valid_scale_words = from_words_to_digits.valid_scale_words
|
|
valid_unit_words = from_words_to_digits.valid_unit_words
|
|
valid_zero_words = from_words_to_digits.valid_zero_words
|
|
|
|
if imply_single_unit:
|
|
only_scale = True
|
|
|
|
# Allow reformatting for a regular number.
|
|
allow_reformat = True
|
|
|
|
# Primary loop.
|
|
current = result = 0
|
|
suffix = ""
|
|
|
|
# This prevents "one and" from being evaluated.
|
|
is_final = False
|
|
# increment_final = 0 # UNUSED.
|
|
increment_final_real = 0
|
|
scale_final = 0
|
|
word_index_final = -1
|
|
result_final = ("", "", word_index, allow_reformat)
|
|
|
|
# Loop while splitting to break into individual words.
|
|
while word_index < word_list_len:
|
|
word_data = number_words.get(word_list[word_index])
|
|
|
|
if word_data is None:
|
|
# raise Exception('Illegal word: ' + word)
|
|
break
|
|
|
|
# When explicitly stated, the word "zero" should terminate the current number and start a new value.
|
|
# Since it doesn't make sense to say "fifty zero" as it does to say "fifty one".
|
|
if word_index_final != -1 and word_list[word_index] in valid_zero_words:
|
|
break
|
|
|
|
# Use the index by the multiplier.
|
|
scale, increment, suffix, is_final = word_data
|
|
increment_real = increment
|
|
if force_single_units:
|
|
if increment != 0:
|
|
increment = 1
|
|
|
|
# This prevents "three and two" from resolving to "5".
|
|
# which we never want, unlike "three hundred and two" which resolves to "302"
|
|
if word_index_final != -1:
|
|
if not is_final:
|
|
if word_list[word_index_final - 1] in valid_unit_words:
|
|
break
|
|
|
|
# Check the unit words can be combined!
|
|
# Saying "twenty one" makes sense but the following cases don't:
|
|
# - "twenty twelve"
|
|
# - "ninety fifty"
|
|
if scale_final == scale:
|
|
if word_list[word_index] in valid_unit_words and word_list[word_index_final] in valid_unit_words:
|
|
if not (increment_final_real >= 20 and increment_real < 10):
|
|
break
|
|
|
|
if imply_single_unit:
|
|
if only_scale:
|
|
if word_list[word_index] not in valid_scale_words:
|
|
only_scale = False
|
|
|
|
if only_scale and current == 0 and result == 0:
|
|
current = 1 * scale
|
|
word_index += 1
|
|
break
|
|
|
|
current = (current * scale) + increment
|
|
|
|
# If larger than 100 then push for a round 2.
|
|
if scale > 100:
|
|
result += current
|
|
current = 0
|
|
|
|
word_index += 1
|
|
|
|
if is_final:
|
|
result_final = ("{:d}".format(result + current), suffix, word_index, allow_reformat)
|
|
word_index_final = word_index
|
|
scale_final = scale
|
|
# increment_final = increment
|
|
increment_final_real = increment_real
|
|
|
|
# Once there is a suffix, don't attempt to parse extra numbers.
|
|
if suffix:
|
|
break
|
|
|
|
if not is_final:
|
|
# Use the last final result as the output (this resolves problems with a trailing 'and')
|
|
return result_final
|
|
|
|
# Return the result plus the current.
|
|
return "{:d}".format(result + current), suffix, word_index, allow_reformat
|
|
|
|
@staticmethod
|
|
def _allow_follow_on_word(w_prev: str, w: str) -> bool:
|
|
valid_unit_words = from_words_to_digits.valid_unit_words
|
|
number_words = from_words_to_digits._number_words
|
|
|
|
if not w_prev in valid_unit_words:
|
|
return False
|
|
if not w in valid_unit_words:
|
|
return False
|
|
increment_prev = number_words[w_prev][1]
|
|
increment = number_words[w][1]
|
|
if (increment_prev >= 20) and (increment < 10) and (increment != 0):
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def parse_number_calc_delimiter_from_series(
|
|
word_list: List[str],
|
|
word_index: int,
|
|
word_index_len: int,
|
|
) -> int:
|
|
|
|
valid_unit_words = from_words_to_digits.valid_unit_words
|
|
number_words = from_words_to_digits._number_words
|
|
|
|
i = word_index
|
|
i_span_beg = word_index
|
|
w_prev = ""
|
|
result_prev = None
|
|
result_test = None
|
|
while i < word_index_len:
|
|
w = word_list[i]
|
|
if w not in number_words:
|
|
break
|
|
|
|
if (i != word_index) and from_words_to_digits._allow_follow_on_word(word_list[i - 1], w):
|
|
# Don't set `w_prev` so we can detect "thirteen and fifty five" without the last "five" delimiting.
|
|
pass
|
|
else:
|
|
if (w_prev not in {"", "and"}) and w in valid_unit_words:
|
|
# Exception ... allow "thirty three", two words...
|
|
result_prev = result_test
|
|
result_test = from_words_to_digits._parse_number_as_whole_value(
|
|
word_list,
|
|
i, # Limit.
|
|
i_span_beg, # Split start.
|
|
force_single_units=True,
|
|
)
|
|
assert result_test[2] == i
|
|
|
|
if result_prev:
|
|
if len(result_prev[0]) == len(result_test[0]):
|
|
return result_prev[2]
|
|
|
|
i_span_beg = i
|
|
w_prev = w
|
|
i += 1
|
|
|
|
result_prev = result_test
|
|
result_test = from_words_to_digits._parse_number_as_whole_value(
|
|
word_list,
|
|
i, # Limit.
|
|
i_span_beg, # Split start.
|
|
force_single_units=True,
|
|
)
|
|
|
|
if result_prev:
|
|
if len(result_prev[0]) == len(result_test[0]):
|
|
return result_prev[2]
|
|
|
|
return word_index_len
|
|
|
|
@staticmethod
|
|
def parse_number_calc_delimiter_from_slide(
|
|
word_list: List[str],
|
|
word_index: int,
|
|
word_index_len: int,
|
|
) -> int:
|
|
|
|
valid_unit_words = from_words_to_digits.valid_unit_words
|
|
number_words = from_words_to_digits._number_words
|
|
i = word_index
|
|
w_prev = ""
|
|
while i < word_index_len:
|
|
w = word_list[i]
|
|
if w not in number_words:
|
|
break
|
|
if (i != word_index) and from_words_to_digits._allow_follow_on_word(word_list[i - 1], w):
|
|
# Don't set `w_prev` so we can detect "thirteen and fifty five" without the last "five" delimiting.
|
|
pass
|
|
else:
|
|
if (w_prev not in {"", "and"}) and w in valid_unit_words:
|
|
result_test_lhs = from_words_to_digits._parse_number_as_whole_value(
|
|
word_list,
|
|
i, # Limit.
|
|
word_index, # Split start.
|
|
force_single_units=True,
|
|
)
|
|
result_test_rhs = from_words_to_digits._parse_number_as_whole_value(
|
|
word_list,
|
|
word_index_len, # Limit.
|
|
i, # Split start.
|
|
force_single_units=True,
|
|
)
|
|
|
|
# If the number on the right is larger, split here.
|
|
if len(result_test_lhs[0]) <= len(result_test_rhs[0]):
|
|
return result_test_lhs[2]
|
|
|
|
w_prev = w
|
|
i += 1
|
|
|
|
return word_index_len
|
|
|
|
@staticmethod
|
|
def parse_number(
|
|
word_list: List[str],
|
|
word_index: int,
|
|
imply_single_unit: bool = False,
|
|
) -> Tuple[str, str, int, bool]:
|
|
|
|
word_list_len = len(word_list)
|
|
|
|
# Delimit, prevent accumulating "one hundred two hundred" -> "300" for example.
|
|
word_list_len = from_words_to_digits.parse_number_calc_delimiter_from_series(
|
|
word_list,
|
|
word_index,
|
|
word_list_len,
|
|
)
|
|
word_list_len = from_words_to_digits.parse_number_calc_delimiter_from_slide(
|
|
word_list,
|
|
word_index,
|
|
word_list_len,
|
|
)
|
|
|
|
return from_words_to_digits._parse_number_as_whole_value(
|
|
word_list,
|
|
word_list_len,
|
|
word_index,
|
|
imply_single_unit=imply_single_unit,
|
|
)
|
|
|
|
@staticmethod
|
|
def parse_numbers_in_word_list(
|
|
word_list: List[str],
|
|
numbers_use_separator: bool = False,
|
|
) -> None:
|
|
i = 0
|
|
i_number_prev = -1
|
|
while i < len(word_list):
|
|
if word_list[i] in from_words_to_digits.valid_digit_words:
|
|
number, suffix, i_next, allow_reformat = from_words_to_digits.parse_number(
|
|
word_list, i, imply_single_unit=True
|
|
)
|
|
if i != i_next:
|
|
word_list[i:i_next] = [
|
|
("{:,d}".format(int(number)) if (numbers_use_separator and allow_reformat) else number)
|
|
+ suffix
|
|
]
|
|
|
|
if (i_number_prev != -1) and (i_number_prev + 1 != i):
|
|
words_between = tuple(word_list[i_number_prev + 1 : i])
|
|
found = True
|
|
# While more could be added here, for now this is enough.
|
|
if words_between == ("point",):
|
|
word_list[i_number_prev : i + 1] = [word_list[i_number_prev] + "." + word_list[i]]
|
|
elif words_between == ("minus",):
|
|
word_list[i_number_prev : i + 1] = [word_list[i_number_prev] + " - " + word_list[i]]
|
|
elif words_between == ("plus",):
|
|
word_list[i_number_prev : i + 1] = [word_list[i_number_prev] + " + " + word_list[i]]
|
|
elif words_between == ("divided", "by"):
|
|
word_list[i_number_prev : i + 1] = [word_list[i_number_prev] + " / " + word_list[i]]
|
|
elif words_between in {("multiplied", "by"), ("times",)}:
|
|
word_list[i_number_prev : i + 1] = [word_list[i_number_prev] + " * " + word_list[i]]
|
|
elif words_between == ("modulo",):
|
|
word_list[i_number_prev : i + 1] = [word_list[i_number_prev] + " % " + word_list[i]]
|
|
else:
|
|
found = False
|
|
|
|
if found:
|
|
i = i_number_prev
|
|
|
|
i_number_prev = i
|
|
i -= 1
|
|
i += 1
|
|
|
|
# Group numbers - recite single digit phone numbers for example.
|
|
# This could be optional, but generally seems handy (good default behavior),
|
|
# e.g. "twenty twenty" -> "2020".
|
|
i = 0
|
|
while i < len(word_list):
|
|
if word_list[i].isdigit() and len(word_list[i]) <= 2:
|
|
j = i + 1
|
|
while j < len(word_list):
|
|
if word_list[j].isdigit() and len(word_list[j]) <= 2:
|
|
j += 1
|
|
else:
|
|
break
|
|
if i + 1 != j:
|
|
word_list[i:j] = ["".join(word_list[i:j])]
|
|
i = j
|
|
else:
|
|
i += 1
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Process Text
|
|
#
|
|
|
|
|
|
def process_text_with_user_config(user_config: ModuleType, text: str) -> str:
|
|
|
|
process_fn_name = "nerd_dictation_process"
|
|
process_fn = getattr(user_config, process_fn_name)
|
|
if process_fn is None:
|
|
sys.stderr.write("User configuration %r has no %r function\n" % (user_config, process_fn_name))
|
|
return text
|
|
|
|
try:
|
|
text = process_fn(text)
|
|
except Exception as ex:
|
|
sys.stderr.write("Failed to run %r with error %s\n" % (user_config, str(ex)))
|
|
sys.exit(1)
|
|
|
|
if not isinstance(text, str):
|
|
sys.stderr.write("%r returned a %r type, instead of a string\n" % (process_fn_name, type(text)))
|
|
sys.exit(1)
|
|
|
|
return text
|
|
|
|
|
|
def process_text(
|
|
text: str,
|
|
*,
|
|
full_sentence: bool = False,
|
|
numbers_as_digits: bool = False,
|
|
numbers_use_separator: bool = False,
|
|
) -> str:
|
|
"""
|
|
Basic post processing on text.
|
|
Mainly to capitalize words however other kinds of replacements may be supported.
|
|
"""
|
|
|
|
# Make absolutely sure we never add new lines in text that is typed in.
|
|
# As this will press the return key when using automated key input.
|
|
text = text.replace("\n", " ")
|
|
words = text.split(" ")
|
|
|
|
# First parse numbers.
|
|
if numbers_as_digits:
|
|
from_words_to_digits.parse_numbers_in_word_list(
|
|
words,
|
|
numbers_use_separator=numbers_use_separator,
|
|
)
|
|
|
|
# Optional?
|
|
if full_sentence:
|
|
words[0] = words[0].capitalize()
|
|
words[-1] = words[-1]
|
|
|
|
return " ".join(words)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Text from VOSK
|
|
#
|
|
|
|
|
|
def text_from_vosk_pipe(
|
|
*,
|
|
vosk_model_dir: str,
|
|
exit_fn: Callable[..., int],
|
|
process_fn: Callable[[str], str],
|
|
handle_fn: Callable[[str, int], None],
|
|
timeout: float,
|
|
idle_time: float,
|
|
progressive: bool,
|
|
progressive_continuous: bool,
|
|
sample_rate: int,
|
|
input_method: str,
|
|
pulse_device_name: str = "",
|
|
) -> bool:
|
|
# Delay some imports until recording has started to avoid minor delays.
|
|
import json
|
|
|
|
if not os.path.exists(vosk_model_dir):
|
|
sys.stderr.write(
|
|
"Please download the model from https://alphacephei.com/vosk/models and unpack it to %r.\n"
|
|
% vosk_model_dir
|
|
)
|
|
sys.exit(1)
|
|
|
|
if input_method == "PAREC":
|
|
cmd = (
|
|
"parec",
|
|
"--record",
|
|
"--rate=%d" % sample_rate,
|
|
"--channels=1",
|
|
*(("--device=%s" % pulse_device_name,) if pulse_device_name else ()),
|
|
"--format=s16ne",
|
|
"--latency=10",
|
|
)
|
|
elif input_method == "SOX":
|
|
cmd = (
|
|
"sox",
|
|
"-q",
|
|
"-V1",
|
|
"-d",
|
|
"--buffer",
|
|
"1000",
|
|
"-r",
|
|
"%d" % sample_rate,
|
|
"-b",
|
|
"16",
|
|
"-e",
|
|
"signed-integer",
|
|
"-c",
|
|
"1",
|
|
"-t",
|
|
"raw",
|
|
"-L",
|
|
"-",
|
|
)
|
|
else:
|
|
sys.stderr.write("--input %r not supported.\n" % input_method)
|
|
sys.exit(1)
|
|
|
|
# ~ print(cmd)
|
|
# ~ ('parec', '--record', '--rate=44100', '--channels=1', '--format=s16ne', '--latency=10')
|
|
ps = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
|
|
|
stdout = ps.stdout
|
|
assert stdout is not None
|
|
|
|
# Needed so whatever is available can be read (without waiting).
|
|
file_handle_make_non_blocking(stdout)
|
|
|
|
# `mypy` doesn't know about VOSK.
|
|
import vosk # type: ignore
|
|
|
|
vosk.SetLogLevel(-1)
|
|
|
|
model = vosk.Model(vosk_model_dir)
|
|
rec = vosk.KaldiRecognizer(model, sample_rate)
|
|
# 1mb (allow for loading the model to take some time).
|
|
block_size = 104_8576
|
|
|
|
use_timeout = timeout != 0.0
|
|
if use_timeout:
|
|
timeout_text_prev = ""
|
|
timeout_time_prev = time.time()
|
|
|
|
# Collect the output used when time-out is enabled.
|
|
if not (progressive and progressive_continuous):
|
|
text_list = []
|
|
|
|
# Set true if handle has been called.
|
|
handled_any = False
|
|
|
|
if progressive:
|
|
text_prev = ""
|
|
|
|
# Track this to prevent excessive load when the "partial" result doesn't change.
|
|
json_text_partial_prev = ""
|
|
|
|
def handle_fn_wrapper(text: str, is_partial_arg: bool) -> None:
|
|
nonlocal handled_any
|
|
# Simple deferred text input, just accumulate values in a list (finish entering text on exit).
|
|
if not progressive:
|
|
if is_partial_arg:
|
|
return
|
|
text_list.append(text)
|
|
handled_any = True
|
|
return
|
|
|
|
# Progressive support (type as you speak).
|
|
nonlocal text_prev
|
|
if progressive_continuous:
|
|
text_curr = process_fn(text)
|
|
else:
|
|
text_curr = process_fn(" ".join(text_list + [text]))
|
|
|
|
if text_curr != text_prev:
|
|
match = min(len(text_curr), len(text_prev))
|
|
for i in range(min(len(text_curr), len(text_prev))):
|
|
if text_curr[i] != text_prev[i]:
|
|
match = i
|
|
break
|
|
|
|
# Emit text, deleting any previous incorrectly transcribed output
|
|
handle_fn(text_curr[match:], len(text_prev) - match)
|
|
|
|
text_prev = text_curr
|
|
|
|
if not is_partial_arg:
|
|
if progressive_continuous:
|
|
text_prev = ""
|
|
else:
|
|
text_list.append(text)
|
|
|
|
handled_any = True
|
|
|
|
# Use code to delay exiting, allowing reading the recording buffer to catch-up.
|
|
code = 0
|
|
|
|
if idle_time > 0.0:
|
|
idle_time_prev = time.time()
|
|
|
|
while code == 0:
|
|
# -1=cancel, 0=continue, 1=finish.
|
|
code = exit_fn(handled_any)
|
|
|
|
if idle_time > 0.0:
|
|
# Subtract processing time from the previous loop.
|
|
# Skip idling in the event dictation can't keep up with the recording.
|
|
idle_time_curr = time.time()
|
|
idle_time_test = idle_time - (idle_time_curr - idle_time_prev)
|
|
if idle_time_test > 0.0:
|
|
# Prevents excessive processor load.
|
|
time.sleep(idle_time_test)
|
|
idle_time_prev = time.time()
|
|
else:
|
|
idle_time_prev = idle_time_curr
|
|
|
|
# Mostly the data read is quite small (under 1k).
|
|
# Only the 1st entry in the loop reads a lot of data due to the time it takes to initialize the VOSK module.
|
|
data = stdout.read(block_size)
|
|
|
|
if data:
|
|
ok = rec.AcceptWaveform(data)
|
|
|
|
if ok:
|
|
json_text = rec.Result()
|
|
json_text_partial_prev = ""
|
|
json_data = json.loads(json_text)
|
|
text = json_data["text"]
|
|
assert isinstance(text, str)
|
|
if text:
|
|
handle_fn_wrapper(text, False)
|
|
else:
|
|
# Only for comparison, to detect change.
|
|
# if use_timeout:
|
|
json_text = rec.PartialResult()
|
|
# Without this, there are *many* calls with the same partial text.
|
|
if json_text_partial_prev != json_text:
|
|
json_text_partial_prev = json_text
|
|
|
|
json_data = json.loads(json_text)
|
|
text = json_data["partial"]
|
|
if text:
|
|
handle_fn_wrapper(text, True)
|
|
|
|
# Monitor the partial output.
|
|
# Finish if no changes are made for `timeout` seconds.
|
|
if use_timeout:
|
|
if json_text != timeout_text_prev:
|
|
timeout_text_prev = json_text
|
|
timeout_time_prev = time.time()
|
|
elif time.time() - timeout_time_prev > timeout:
|
|
if code == 0:
|
|
code = 1 # The time was exceeded, exit!
|
|
|
|
# Close the recording process.
|
|
import signal
|
|
|
|
os.kill(ps.pid, signal.SIGINT)
|
|
|
|
if code == -1:
|
|
sys.stderr.write("Text input canceled!\n")
|
|
sys.exit(0)
|
|
|
|
# This writes many JSON blocks, use the last one.
|
|
json_text = rec.FinalResult()
|
|
json_data = json.loads(json_text)
|
|
text = json_data["text"]
|
|
assert isinstance(text, str)
|
|
if text:
|
|
handle_fn_wrapper(text, False)
|
|
|
|
if not progressive:
|
|
# We never arrive here needing deletions
|
|
handle_fn(process_fn(" ".join(text_list)), 0)
|
|
|
|
return handled_any
|
|
|
|
|
|
def main_begin(
|
|
*,
|
|
vosk_model_dir: str,
|
|
path_to_cookie: str = "",
|
|
pulse_device_name: str = "",
|
|
sample_rate: int = 44100,
|
|
input_method: str = "PAREC",
|
|
progressive: bool = True,
|
|
progressive_continuous: bool = False,
|
|
full_sentence: bool = False,
|
|
numbers_as_digits: bool = False,
|
|
numbers_use_separator: bool = False,
|
|
timeout: float = 0.0,
|
|
idle_time: float = 0.1,
|
|
delay_exit: float = 0.0,
|
|
punctuate_from_previous_timeout: float = 0.0,
|
|
config_override: Optional[str],
|
|
output: str = "SIMULATE_INPUT",
|
|
) -> None:
|
|
"""
|
|
Initialize audio recording, then full text to speech conversion can take place.
|
|
|
|
This is terminated by the ``end`` or ``cancel`` actions.
|
|
"""
|
|
|
|
# Find language model in:
|
|
# - `--vosk-model-dir=...`
|
|
# - `~/.config/nerd-dictation/model`
|
|
if not vosk_model_dir:
|
|
vosk_model_dir = calc_user_config_path("model")
|
|
# If this still doesn't exist the error is handled later.
|
|
|
|
#
|
|
# Initialize the recording state and perform some sanity checks.
|
|
#
|
|
if not path_to_cookie:
|
|
path_to_cookie = os.path.join(tempfile.gettempdir(), TEMP_COOKIE_NAME)
|
|
|
|
is_run_on = False
|
|
if punctuate_from_previous_timeout > 0.0:
|
|
age_in_seconds: Optional[float] = None
|
|
try:
|
|
age_in_seconds = file_age_in_seconds(path_to_cookie)
|
|
except FileNotFoundError:
|
|
age_in_seconds = None
|
|
is_run_on = age_in_seconds is not None and (age_in_seconds < punctuate_from_previous_timeout)
|
|
del age_in_seconds
|
|
|
|
# Force zero time-stamp so a fast begin/end (tap) action
|
|
# doesn't leave dictation running.
|
|
touch(path_to_cookie, mtime=0)
|
|
cookie_timestamp = file_mtime_or_none(path_to_cookie)
|
|
if cookie_timestamp != 0:
|
|
sys.stderr.write("Cookie removed after right after creation (unlikely but respect the request)\n")
|
|
return
|
|
|
|
#
|
|
# Start recording the output file.
|
|
#
|
|
|
|
touch_mtime = None
|
|
use_overtime = delay_exit > 0.0 and timeout == 0.0
|
|
|
|
# Lazy loaded so recording can start 1st.
|
|
user_config = None
|
|
|
|
def exit_fn(handled_any: bool) -> int:
|
|
nonlocal touch_mtime
|
|
if not os.path.exists(path_to_cookie):
|
|
return -1 # Cancel.
|
|
if file_mtime_or_none(path_to_cookie) != cookie_timestamp:
|
|
|
|
# Only delay exit if some text has been handled,
|
|
# this prevents accidental tapping of push to talk from running.
|
|
if handled_any:
|
|
# Implement `delay_exit` workaround.
|
|
if use_overtime:
|
|
if touch_mtime is None:
|
|
touch_mtime = time.time()
|
|
if time.time() - touch_mtime < delay_exit:
|
|
# Continue until `delay_exit` is reached.
|
|
return 0
|
|
# End `delay_exit`.
|
|
|
|
return 1 # End.
|
|
return 0 # Continue.
|
|
|
|
process_fn_is_first = True
|
|
|
|
def process_fn(text: str) -> str:
|
|
nonlocal user_config
|
|
nonlocal process_fn_is_first
|
|
|
|
#
|
|
# Load the user configuration (when found).
|
|
#
|
|
if process_fn_is_first:
|
|
user_config = user_config_as_module_or_none(config_override=config_override)
|
|
|
|
#
|
|
# User text post processing (when found).
|
|
#
|
|
if user_config is not None:
|
|
text = process_text_with_user_config(user_config, text)
|
|
|
|
#
|
|
# Simple text post processing and capitalization.
|
|
#
|
|
text = process_text(
|
|
text,
|
|
full_sentence=full_sentence,
|
|
numbers_as_digits=numbers_as_digits,
|
|
numbers_use_separator=numbers_use_separator,
|
|
)
|
|
|
|
if is_run_on:
|
|
# This is a signal that the end of the sentence has been reached.
|
|
if full_sentence:
|
|
text = ". " + text
|
|
else:
|
|
text = ", " + text
|
|
|
|
process_fn_is_first = False
|
|
|
|
return text
|
|
|
|
#
|
|
# Handled the resulting text
|
|
#
|
|
if output == "SIMULATE_INPUT":
|
|
|
|
def handle_fn(text: str, delete_prev_chars: int) -> None:
|
|
if delete_prev_chars:
|
|
run_xdotool("key", ["BackSpace"] * delete_prev_chars)
|
|
run_xdotool("type", ["--", text])
|
|
|
|
elif output == "STDOUT":
|
|
|
|
def handle_fn(text: str, delete_prev_chars: int) -> None:
|
|
if delete_prev_chars:
|
|
sys.stdout.write("\x08" * delete_prev_chars)
|
|
sys.stdout.write(text)
|
|
|
|
else:
|
|
# Unreachable.
|
|
assert False
|
|
|
|
found_any = text_from_vosk_pipe(
|
|
vosk_model_dir=vosk_model_dir,
|
|
pulse_device_name=pulse_device_name,
|
|
sample_rate=sample_rate,
|
|
input_method=input_method,
|
|
timeout=timeout,
|
|
idle_time=idle_time,
|
|
progressive=progressive,
|
|
progressive_continuous=progressive_continuous,
|
|
exit_fn=exit_fn,
|
|
process_fn=process_fn,
|
|
handle_fn=handle_fn,
|
|
)
|
|
|
|
if not found_any:
|
|
sys.stderr.write("No text found in the audio\n")
|
|
# Avoid continuing punctuation from where this recording (which recorded nothing) left off.
|
|
touch(path_to_cookie)
|
|
return
|
|
|
|
|
|
def main_end(
|
|
*,
|
|
path_to_cookie: str = "",
|
|
) -> None:
|
|
if not path_to_cookie:
|
|
path_to_cookie = os.path.join(tempfile.gettempdir(), TEMP_COOKIE_NAME)
|
|
|
|
touch(path_to_cookie)
|
|
|
|
|
|
def main_cancel(
|
|
*,
|
|
path_to_cookie: str = "",
|
|
) -> None:
|
|
if not path_to_cookie:
|
|
path_to_cookie = os.path.join(tempfile.gettempdir(), TEMP_COOKIE_NAME)
|
|
|
|
file_remove_if_exists(path_to_cookie)
|
|
|
|
|
|
def argparse_generic_command_cookie(subparse: argparse.ArgumentParser) -> None:
|
|
subparse.add_argument(
|
|
"--cookie",
|
|
dest="path_to_cookie",
|
|
default="",
|
|
type=str,
|
|
metavar="FILE_PATH",
|
|
help="Location for writing a temporary cookie (this file is monitored to begin/end dictation).",
|
|
required=False,
|
|
)
|
|
|
|
|
|
def argparse_create_begin(subparsers: argparse._SubParsersAction) -> None:
|
|
subparse = subparsers.add_parser(
|
|
"begin",
|
|
help="Begin dictation.",
|
|
description="""\
|
|
This creates the directory used to store internal data, so other commands such as sync can be performed.
|
|
""",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
|
|
argparse_generic_command_cookie(subparse)
|
|
|
|
subparse.add_argument(
|
|
"--config",
|
|
default=None,
|
|
dest="config",
|
|
type=str,
|
|
metavar="FILE",
|
|
help=(
|
|
"Override the file used for the user configuration\n"
|
|
"Use an empty string to disable a custom configuration."
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--vosk-model-dir",
|
|
default="",
|
|
dest="vosk_model_dir",
|
|
type=str,
|
|
metavar="DIR",
|
|
help=("Path to the VOSK model, see: https://alphacephei.com/vosk/models"),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--pulse-device-name",
|
|
dest="pulse_device_name",
|
|
default="",
|
|
type=str,
|
|
metavar="IDENTIFIER",
|
|
help=(
|
|
"The name of the pulse-audio device to use for recording.\n"
|
|
'See the output of "pactl list sources" to find device names (using the identifier following "Name:").'
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--sample-rate",
|
|
dest="sample_rate",
|
|
default=44100,
|
|
type=int,
|
|
metavar="HZ",
|
|
help=("The sample rate to use for recording (in Hz).\n" "Defaults to 44100."),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--defer-output",
|
|
dest="defer_output",
|
|
default=False,
|
|
action="store_true",
|
|
help=(
|
|
"When enabled, output is deferred until exiting.\n"
|
|
"\n"
|
|
"This prevents text being typed during speech (implied with ``--output=STDOUT``)"
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--continuous",
|
|
dest="progressive_continuous",
|
|
default=False,
|
|
action="store_true",
|
|
help=(
|
|
"Enable this option, when you intend to keep the dictation process enabled for extended periods of time.\n"
|
|
"without this enabled, the entirety of this dictation session will be processed on every update.\n"
|
|
"Only used when ``--defer-output`` is disabled."
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--timeout",
|
|
dest="timeout",
|
|
default=0.0,
|
|
type=float,
|
|
metavar="SECONDS",
|
|
help=(
|
|
"Time out recording when no speech is processed for the time in seconds.\n"
|
|
"This can be used to avoid having to explicitly exit "
|
|
"(zero disables)."
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--idle-time",
|
|
dest="idle_time",
|
|
default=0.1,
|
|
type=float,
|
|
metavar="SECONDS",
|
|
help=(
|
|
"Time to idle between processing audio from the recording.\n"
|
|
"Setting to zero is the most responsive at the cost of high CPU usage.\n"
|
|
"The default value is 0.1 (processing 10 times a second), which is quite responsive in practice\n"
|
|
"(the maximum value is clamped to 0.5)"
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--delay-exit",
|
|
dest="delay_exit",
|
|
default=0.0,
|
|
type=float,
|
|
metavar="SECONDS",
|
|
help=(
|
|
"The time to continue running after an exit request.\n"
|
|
'this can be useful so "push to talk" setups can be released while you finish speaking\n'
|
|
"(zero disables)."
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--punctuate-from-previous-timeout",
|
|
dest="punctuate_from_previous_timeout",
|
|
default=0.0,
|
|
type=float,
|
|
metavar="SECONDS",
|
|
help=(
|
|
"The time-out in seconds for detecting the state of dictation from the previous recording, "
|
|
"this can be useful so punctuation it is added before entering the dictation"
|
|
"(zero disables)."
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--full-sentence",
|
|
dest="full_sentence",
|
|
default=False,
|
|
action="store_true",
|
|
help=(
|
|
"Capitalize the first character.\n"
|
|
"This is also used to add either a comma or a full stop when dictation is performed under the\n"
|
|
"``--punctuate-from-previous-timeout`` value."
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--numbers-as-digits",
|
|
dest="numbers_as_digits",
|
|
default=False,
|
|
action="store_true",
|
|
help=("Convert numbers into digits instead of using whole words."),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--numbers-use-separator",
|
|
dest="numbers_use_separator",
|
|
default=False,
|
|
action="store_true",
|
|
help=("Use a comma separators for numbers."),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--input",
|
|
dest="input_method",
|
|
default="PAREC",
|
|
type=str,
|
|
metavar="INPUT_METHOD",
|
|
choices=("PAREC", "SOX"),
|
|
help=(
|
|
"Specify input method to be used for audio recording. Valid methods: PAREC, SOX\n"
|
|
" PAREC (external command, default)\n"
|
|
" See --pulse-device-name option to use a specific pulse-audio device.\n"
|
|
" SOX (external command)\n"
|
|
" Set environment variable AUDIODEV to use a specific input device.\n"
|
|
" Other sox options can be set (such as gain) by setting environment variable SOX_OPTS.\n"
|
|
" You can test various devices by: \n"
|
|
" arecord -l || cat /proc/asound/cards || cat /dev/sndstat # List audio devices.\n"
|
|
" # Example, use card 2, subdevice 0. Record 10 seconds and playback to default output,\n"
|
|
" AUDIODEV='hw:2,0' sox -d --buffer 1000 -r 16000 -b 16 -e signed-integer -c 1 -t wav -L test.wav trim 0 10\n"
|
|
" sox test.wav -d"
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"--output",
|
|
dest="output",
|
|
default="SIMULATE_INPUT",
|
|
choices=("SIMULATE_INPUT", "STDOUT"),
|
|
metavar="OUTPUT_METHOD",
|
|
help=(
|
|
"Method used to at put the result of speech to text.\n"
|
|
"\n"
|
|
"- ``SIMULATE_INPUT`` simulate keystrokes (default).\n"
|
|
"- ``STDOUT`` print the result to the standard output.\n"
|
|
" Be sure only to handle text from the standard output\n"
|
|
" as the standard error may be used for reporting any problems that occur.\n"
|
|
),
|
|
required=False,
|
|
)
|
|
|
|
subparse.add_argument(
|
|
"-",
|
|
dest="rest",
|
|
default=False,
|
|
nargs=argparse.REMAINDER,
|
|
help=(
|
|
"End argument parsing.\n"
|
|
"This can be used for user defined arguments which configuration scripts may read from the ``sys.argv``."
|
|
),
|
|
)
|
|
|
|
subparse.set_defaults(
|
|
func=lambda args: main_begin(
|
|
path_to_cookie=args.path_to_cookie,
|
|
vosk_model_dir=args.vosk_model_dir,
|
|
pulse_device_name=args.pulse_device_name,
|
|
sample_rate=args.sample_rate,
|
|
input_method=args.input_method,
|
|
progressive=not (args.defer_output or args.output == "STDOUT"),
|
|
progressive_continuous=args.progressive_continuous,
|
|
full_sentence=args.full_sentence,
|
|
numbers_as_digits=args.numbers_as_digits,
|
|
numbers_use_separator=args.numbers_use_separator,
|
|
timeout=args.timeout,
|
|
idle_time=min(args.idle_time, 0.5),
|
|
delay_exit=args.delay_exit,
|
|
punctuate_from_previous_timeout=args.punctuate_from_previous_timeout,
|
|
config_override=args.config,
|
|
output=args.output,
|
|
),
|
|
)
|
|
|
|
|
|
def argparse_create_end(subparsers: argparse._SubParsersAction) -> None:
|
|
subparse = subparsers.add_parser(
|
|
"end",
|
|
help="End dictation.",
|
|
description="""\
|
|
This ends dictation, causing the text to be typed in.
|
|
""",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
|
|
argparse_generic_command_cookie(subparse)
|
|
|
|
subparse.set_defaults(
|
|
func=lambda args: main_end(
|
|
path_to_cookie=args.path_to_cookie,
|
|
),
|
|
)
|
|
|
|
|
|
def argparse_create_cancel(subparsers: argparse._SubParsersAction) -> None:
|
|
subparse = subparsers.add_parser(
|
|
"cancel",
|
|
help="Cancel dictation.",
|
|
description="""\
|
|
This cancels dictation.
|
|
""",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
|
|
argparse_generic_command_cookie(subparse)
|
|
|
|
subparse.set_defaults(
|
|
func=lambda args: main_cancel(
|
|
path_to_cookie=args.path_to_cookie,
|
|
),
|
|
)
|
|
|
|
|
|
def argparse_create() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
|
|
|
|
subparsers = parser.add_subparsers()
|
|
|
|
argparse_create_begin(subparsers)
|
|
argparse_create_end(subparsers)
|
|
argparse_create_cancel(subparsers)
|
|
|
|
return parser
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> None:
|
|
parser = argparse_create()
|
|
args = parser.parse_args(argv)
|
|
# Call sub-parser callback.
|
|
if not hasattr(args, "func"):
|
|
parser.print_help()
|
|
return
|
|
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|