はじめに
本記事はおしゃべりAIをオフラインかつローカルで実装するシリーズ第五回です。
前回までは、VRoidでキャラクターを作成してUnityで口パクできるプロジェクトを作成しました。今回は今まで作ったプログラムを編集し直してつなげ合わせ、使用者が声でキャラクターを対話できるようにします。
前回
シリーズ
こちらの手順を踏まえて、作っていきたいと思います。
※ 本シリーズはローカルUbuntuマシンにGPUがある場合を想定しております。
第一回:マイクからリアルタイムで音声認識する
第二回:音声認識した結果から返答生成する
第三回:返答を合成音声で喋らせる
第四回:キャラクターが喋っているような見た目を作る
第五回:今までで作ったものを組み合わせる ← 今回はこちらになります。
今回の目次
- プログラム構成
- 音声認識プログラム変更
- テキスト生成プログラム変更
- 合成音声プログラム変更
- すべてのプログラム実行
1. プログラム構成
音声認識、テキスト生成、音声合成をするプログラムを以下のディレクトリ構造にまとめます。
./ ├── Dockerfile ├── models │ ├── ChatRWKV │ ├── rwkv │ ├── sr │ └── tts ├── pkg │ └── ninja-linux.zip ├── scripts │ └── docker_run.sh ├── sr │ └── voice_recognize.py ├── tts │ └── voice_synthesis.py ├── ttt │ └── text_generation.py └── utils └── prompt_utils.py
2. 音声認識プログラム変更
第一回:マイクからリアルタイムで音声認識するで作成したプログラムtools/voice_recognize.py
をsr/voice_recognize.py
に移動し、以下のように編集します。
import os import sys import soundfile import pyaudio import wave import numpy as np import socket from contextlib import closing from espnet_model_zoo.downloader import ModelDownloader os.path.join(os.getcwd()) sys.path.append('./') from espnet2.bin.asr_inference import Speech2Text #apt install portaudio19-dev FORMAT = pyaudio.paInt16 # Audio params chunk = 1024 #1つの音声チャンクの長さを1024サンプルとする threshold = 0.50 #マイクの音量がこれを超えると録音を開始する sampling_rate = 16000 #サンプリングレート、マイク性能に依存 record_seconds = 15 #録音時間の上限 record_cache_seconds = 0.9 stop_seconds = 2 #録音中断までの時間 B = int(sampling_rate / chunk * stop_seconds) A = int(sampling_rate) / int(chunk) * int(record_seconds) A_CACHE = int(sampling_rate) / int(chunk) * int(record_cache_seconds) machine_list = ['docker'] ip_list = ['192.168.66.66'] module_list = ['t2t'] port_list = [13103] def send_text(text, host, port): sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) with closing(sock): message = '{}'.format(text).encode('utf-8') sock.sendto(message,(host,port)) return def load_voice_recog_model(): d = ModelDownloader() model = Speech2Text( **d.download_and_unpack("Shinji Watanabe/laborotv_asr_train_asr_conformer2_latest33_raw_char_sp_valid.acc.ave"), device="cuda" ) return model def save_wave(all, audio_path): data = b''.join(all) out = wave.open(audio_path,'w') out.setnchannels(1) #mono out.setsampwidth(2) #16bits out.setframerate(sampling_rate) out.writeframes(data) out.close() def inference(speech2text, audio_path): speech, _ = soundfile.read(audio_path) nbests = speech2text(speech) text, *_ = nbests[0] return text def recognize(speech2text): audio_path = "speech.wav" p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=1, rate=sampling_rate, input=True, frames_per_buffer=chunk) all = [] while True: #まずサンプルを取る data = stream.read(chunk, exception_on_overflow=False) x = np.frombuffer(data, dtype="int16") / 32768.0 xmax = x.max() print(xmax) if xmax <= threshold: all = [] for i in range(0, int(A_CACHE)): data_cache = stream.read(chunk, exception_on_overflow=False) all.append(data_cache) else: n = 0 #連続して閾値を下回った回数を記録するための変数 for i in range(0, int(A)): data = stream.read(chunk, exception_on_overflow=False) all.append(data) x = np.frombuffer(data, dtype="int16") / 32768.0 xmax = x.max() if xmax <= threshold: n += 1 #閾値を下回っていたらincrementする else: n = 0 #閾値を上回ったらリセットする # 中断判定 if n == B: break # 音声保存 save_wave(all, audio_path) all = [] # 音声認識 text = inference(sr_model, audio_path) print(f"Recognized text = {text}") # 認識した文字数が3以上ならtext to textモデル環境に送信する if len(text) > 2: send_text(text, ip_list[machine_list.index('docker')], port_list[module_list.index('t2t')]) if __name__ == '__main__': sr_model = load_voice_recog_model() recognize(sr_model)
3. テキスト生成プログラム変更
第二回:音声認識した結果から返答生成するで作成したプログラムtools/text_generation.py
をttt/text_generation.py
に移動し、以下のように編集します。
import os import sys from socket import socket, AF_INET, SOCK_DGRAM os.path.join(os.getcwd()) sys.path.append('./') from models.ChatRWKV.rwkv_pip_package.src.rwkv.model import RWKV from rwkv.utils import PIPELINE, PIPELINE_ARGS from utils.prompt_utils import generate_prompt os.environ['RWKV_JIT_ON'] = '1' os.environ["RWKV_CUDA_ON"] = '1' model_path = 'models/rwkv/RWKV-4-Raven-3B-v8-EngAndMore-20230408-ctx4096.pth' module_list = ['t2t'] port_list = [13103] # プロンプト input="" class RWKV4Raven(): def __init__(self): self.input = input self.last_input = "" self.model = RWKV(model=model_path, strategy='cuda fp16') self.pipeline = PIPELINE(self.model, "models/rwkv/20B_tokenizer.json") self.args = PIPELINE_ARGS(temperature = 1.0, top_p = 0.999, top_k = 100, alpha_frequency = 0.25, alpha_presence = 0.25, token_ban = [0], token_stop = [], chunk_len = 512) def generate(self, msg): res = "" if len(msg) > 0: res = self.pipeline.generate(generate_prompt(instruction=msg, input=self.input), token_count=80, args=self.args) # お好み後処理、改行前の文のみ if "\n" in res: res = res.split("\n")[0] return res def save_text(path, text): f = open(path, 'w') f.write('{}\n'.format(text)) f.close() if __name__ == '__main__': # Set port setting to ear module s = socket(AF_INET, SOCK_DGRAM) s.bind(('', port_list[module_list.index('t2t')])) # Load model rwkv = RWKV4Raven() res_path = 'res.txt' print('=> Waiting for speech recognition...') while True: # Receive msg msg, address = s.recvfrom(8192) msg = msg.decode("utf-8") # Inference res = rwkv.generate(msg) print(f'msg = {msg}') print(f'res = {res}') save_text(res_path, res)
4. 合成音声プログラム変更
第三回:返答を合成音声で喋らせるで作成したプログラムtools/voice_synthesis.py
をtts/voice_synthesis.py
に移動し、以下のように編集します。
import os import sys import time import random import soundfile as sf import simpleaudio as sa import socket import multiprocessing from multiprocessing import Manager, Value, Process os.path.join(os.getcwd()) sys.path.append('./') from espnet2.bin.tts_inference import Text2Speech machines = ['localhost', 'mac'] ips = ['', '192.168.11.14'] modules = ['mouth_module'] ports = [11121] host = 'mac' sock_lip = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) def lip_control(is_talking, mouth, host): while True: # Vary lip and emotion if is_talking.value == 1: # Random lip sync mouth.value = random.randrange(0, 10, 1) * 0.1 else: # Close mouth mouth.value = 0.0 # Send to Unity try: send_lip(mouth.value, sock_lip, ips[machines.index(host)], ports[modules.index('mouth_module')]) except: print('send_lip BrokenPipeError') def send_lip(mouse_status, sock, host, port): # mouse_status: 0 or 1 message = '{0}'.format(mouse_status).encode('utf-8') sock.sendto(message,(host,port)) time.sleep(0.1) return def load_tts(model_path): fs, lang = 44100, "Japanese" text2speech = Text2Speech( model_file=model_path, device="cuda", speed_control_alpha=1.2, noise_scale=0.333, noise_scale_dur=0.333, ) return text2speech def rmfile(file_path): if os.path.exists(file_path): os.remove(file_path) def load_text(path): text = '' if os.path.exists(path): with open(path) as f: for line in f: line = line.rstrip() if len(line) > 0: text = line return text def main(): # Load espnet model_path = "models/tts/exp/tts_finetune_full_band_jsut_vits_raw_phn_jaconv_pyopenjtalk_prosody/100epoch.pth" text2speech = load_tts(model_path) # 読み上げる文章 res_path = 'res.txt' with Manager() as manager: is_talking = manager.Value('i', 0) mouth = manager.Value('d', 0.0) # Unityへ口の開度を送信する lip_p = Process(target=lip_control, args=[is_talking, mouth, host]) lip_p.start() while True: if os.path.exists(res_path): res = load_text(res_path) # 推論 wav = text2speech(res)["wav"] # 音声保存 audio_path = "talk.wav" sf.write(audio_path, wav.data.cpu().numpy(), text2speech.fs, "PCM_16") print(f'voice synthesis: {res}') # 保存した音声を読み込み wave_obj = sa.WaveObject.from_wave_file(audio_path) # 再生 is_talking.value = 1 play_obj = wave_obj.play() play_obj.wait_done() is_talking.value = 0 time.sleep(3) rmfile(res_path) lip_p.terminate()
5. すべてのプログラム実行
Unityプロジェクトは前回と同様です。 2, 3, 4で変更したプログラムを個別のターミナルで起動します。
python sr/voice_recognize.py python ttt/text_generation.py python tts/voice_synthesis.py