AnyTech Engineer Blog

AnyTech Engineer Blogは、AnyTechのエンジニアたちによる調査や成果、Tipsなどを公開するブログです。

JARVIS(っぽい何か)を作ろう!第五回:今までで作ったものを組み合わせる

JARVIS(っぽい何か)を作ろう!第五回:今までで作ったものを組み合わせる

はじめに

本記事はおしゃべりAIをオフラインかつローカルで実装するシリーズ第五回です。
前回までは、VRoidでキャラクターを作成してUnityで口パクできるプロジェクトを作成しました。今回は今まで作ったプログラムを編集し直してつなげ合わせ、使用者が声でキャラクターを対話できるようにします。

前回

tech.anytech.co.jp

シリーズ

こちらの手順を踏まえて、作っていきたいと思います。
※ 本シリーズはローカルUbuntuマシンにGPUがある場合を想定しております。

第一回:マイクからリアルタイムで音声認識する
第二回:音声認識した結果から返答生成する
第三回:返答を合成音声で喋らせる
第四回:キャラクターが喋っているような見た目を作る
第五回:今までで作ったものを組み合わせる ← 今回はこちらになります。

今回の目次

  1. プログラム構成
  2. 音声認識プログラム変更
  3. テキスト生成プログラム変更
  4. 合成音声プログラム変更
  5. すべてのプログラム実行

1. プログラム構成

音声認識、テキスト生成、音声合成をするプログラムを以下のディレクトリ構造にまとめます。

./
├── Dockerfile
├── models
│   ├── ChatRWKV
│   ├── rwkv
│   ├── sr
│   └── tts
├── pkg
│   └── ninja-linux.zip
├── scripts
│   └── docker_run.sh
├── sr
│   └── voice_recognize.py
├── tts
│   └── voice_synthesis.py
├── ttt
│   └── text_generation.py
└── utils
    └── prompt_utils.py

2. 音声認識プログラム変更

第一回:マイクからリアルタイムで音声認識するで作成したプログラムtools/voice_recognize.pysr/voice_recognize.pyに移動し、以下のように編集します。

import os
import sys
import soundfile
import pyaudio
import wave
import numpy as np
import socket
from contextlib import closing
from espnet_model_zoo.downloader import ModelDownloader
os.path.join(os.getcwd())
sys.path.append('./')
from espnet2.bin.asr_inference import Speech2Text
#apt install portaudio19-dev
FORMAT = pyaudio.paInt16

# Audio params
chunk = 1024  #1つの音声チャンクの長さを1024サンプルとする
threshold = 0.50  #マイクの音量がこれを超えると録音を開始する
sampling_rate = 16000  #サンプリングレート、マイク性能に依存
record_seconds = 15  #録音時間の上限
record_cache_seconds = 0.9
stop_seconds = 2  #録音中断までの時間
B = int(sampling_rate / chunk * stop_seconds)
A = int(sampling_rate) / int(chunk) * int(record_seconds)
A_CACHE = int(sampling_rate) / int(chunk) * int(record_cache_seconds)

machine_list = ['docker']
ip_list = ['192.168.66.66']
module_list = ['t2t']
port_list = [13103]


def send_text(text, host, port):
    sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
    with closing(sock):
        message = '{}'.format(text).encode('utf-8')
        sock.sendto(message,(host,port))
    return


def load_voice_recog_model():
    d = ModelDownloader()
    model = Speech2Text(
        **d.download_and_unpack("Shinji Watanabe/laborotv_asr_train_asr_conformer2_latest33_raw_char_sp_valid.acc.ave"),
        device="cuda"
    )
    return model


def save_wave(all, audio_path):
    data = b''.join(all)
    out = wave.open(audio_path,'w')
    out.setnchannels(1) #mono
    out.setsampwidth(2) #16bits
    out.setframerate(sampling_rate)
    out.writeframes(data)
    out.close()


def inference(speech2text, audio_path):
    speech, _ = soundfile.read(audio_path)
    nbests = speech2text(speech)
    text, *_ = nbests[0]
    return text


def recognize(speech2text):
    audio_path = "speech.wav"
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=1, rate=sampling_rate, input=True, frames_per_buffer=chunk)
    all = []
    while True:
        #まずサンプルを取る
        data = stream.read(chunk, exception_on_overflow=False)
        x = np.frombuffer(data, dtype="int16") / 32768.0
        xmax = x.max()
        print(xmax)
        if xmax <= threshold:
            all = []
            for i in range(0, int(A_CACHE)):
                data_cache = stream.read(chunk, exception_on_overflow=False)
                all.append(data_cache)
        else:
            n = 0  #連続して閾値を下回った回数を記録するための変数
            for i in range(0, int(A)):
                data = stream.read(chunk, exception_on_overflow=False)
                all.append(data)
                x = np.frombuffer(data, dtype="int16") / 32768.0
                xmax = x.max()
                if xmax <= threshold:
                    n += 1  #閾値を下回っていたらincrementする
                else:
                    n = 0  #閾値を上回ったらリセットする
                # 中断判定
                if n == B:
                    break
            # 音声保存
            save_wave(all, audio_path)
            all = []

            # 音声認識
            text = inference(sr_model, audio_path)
            print(f"Recognized text = {text}")

            # 認識した文字数が3以上ならtext to textモデル環境に送信する
            if len(text) > 2:
                send_text(text,
                      ip_list[machine_list.index('docker')],
                      port_list[module_list.index('t2t')])


if __name__ == '__main__':
    sr_model = load_voice_recog_model()
    recognize(sr_model)

3. テキスト生成プログラム変更

第二回:音声認識した結果から返答生成するで作成したプログラムtools/text_generation.pyttt/text_generation.pyに移動し、以下のように編集します。

import os
import sys
from socket import socket, AF_INET, SOCK_DGRAM
os.path.join(os.getcwd())
sys.path.append('./')
from models.ChatRWKV.rwkv_pip_package.src.rwkv.model import RWKV
from rwkv.utils import PIPELINE, PIPELINE_ARGS
from utils.prompt_utils import generate_prompt
os.environ['RWKV_JIT_ON'] = '1'
os.environ["RWKV_CUDA_ON"] = '1'
model_path = 'models/rwkv/RWKV-4-Raven-3B-v8-EngAndMore-20230408-ctx4096.pth'

module_list = ['t2t']
port_list = [13103]

# プロンプト
input=""

class RWKV4Raven():
    def __init__(self):
        self.input = input
        self.last_input = ""
        self.model = RWKV(model=model_path, strategy='cuda fp16')
        self.pipeline = PIPELINE(self.model, "models/rwkv/20B_tokenizer.json")
        self.args = PIPELINE_ARGS(temperature = 1.0, top_p = 0.999, top_k = 100,
                                  alpha_frequency = 0.25, alpha_presence = 0.25,
                                  token_ban = [0],
                                  token_stop = [],
                                  chunk_len = 512)

    def generate(self, msg):
        res = ""
        if len(msg) > 0:
            res = self.pipeline.generate(generate_prompt(instruction=msg, input=self.input),
                                         token_count=80, args=self.args)
            # お好み後処理、改行前の文のみ
            if "\n" in res:
                res = res.split("\n")[0]
        return res


def save_text(path, text):
    f = open(path, 'w')
    f.write('{}\n'.format(text))
    f.close()


if __name__ == '__main__':
    # Set port setting to ear module
    s = socket(AF_INET, SOCK_DGRAM)
    s.bind(('', port_list[module_list.index('t2t')]))

    # Load model
    rwkv = RWKV4Raven()

    res_path = 'res.txt'

    print('=> Waiting for speech recognition...')
    while True:
        # Receive msg
        msg, address = s.recvfrom(8192)
        msg = msg.decode("utf-8")

        # Inference
        res = rwkv.generate(msg)
        print(f'msg = {msg}')
        print(f'res = {res}')
        save_text(res_path, res)

4. 合成音声プログラム変更

第三回:返答を合成音声で喋らせるで作成したプログラムtools/voice_synthesis.pytts/voice_synthesis.pyに移動し、以下のように編集します。

import os
import sys
import time
import random
import soundfile as sf
import simpleaudio as sa
import socket
import multiprocessing
from multiprocessing import Manager, Value, Process
os.path.join(os.getcwd())
sys.path.append('./')
from espnet2.bin.tts_inference import Text2Speech

machines = ['localhost', 'mac']
ips = ['', '192.168.11.14']
modules = ['mouth_module']
ports = [11121]
host = 'mac'
sock_lip = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

def lip_control(is_talking, mouth, host):
    while True:
        # Vary lip and emotion
        if is_talking.value == 1:
            # Random lip sync
            mouth.value = random.randrange(0, 10, 1) * 0.1
        else:
            # Close mouth
            mouth.value = 0.0
        # Send to Unity
        try:
            send_lip(mouth.value, sock_lip,
                     ips[machines.index(host)], ports[modules.index('mouth_module')])
        except:
            print('send_lip BrokenPipeError')


def send_lip(mouse_status, sock, host, port):
    # mouse_status: 0 or 1
    message = '{0}'.format(mouse_status).encode('utf-8')
    sock.sendto(message,(host,port))
    time.sleep(0.1)
    return


def load_tts(model_path):
    fs, lang = 44100, "Japanese"
    text2speech = Text2Speech(
        model_file=model_path,
        device="cuda",
        speed_control_alpha=1.2,
        noise_scale=0.333,
        noise_scale_dur=0.333,
    )
    return text2speech


def rmfile(file_path):
    if os.path.exists(file_path):
        os.remove(file_path)

def load_text(path):
    text = ''
    if os.path.exists(path):
        with open(path) as f:
            for line in f:
                line = line.rstrip()
                if len(line) > 0:
                    text = line
    return text


def main():
    # Load espnet
    model_path = "models/tts/exp/tts_finetune_full_band_jsut_vits_raw_phn_jaconv_pyopenjtalk_prosody/100epoch.pth"
    text2speech = load_tts(model_path)

    # 読み上げる文章
    res_path = 'res.txt'

    with Manager() as manager:
        is_talking = manager.Value('i', 0)
        mouth = manager.Value('d', 0.0)

        # Unityへ口の開度を送信する
        lip_p = Process(target=lip_control, args=[is_talking, mouth, host])
        lip_p.start()

        while True:
            if os.path.exists(res_path):
                res = load_text(res_path)
                # 推論
                wav = text2speech(res)["wav"]
                # 音声保存
                audio_path = "talk.wav"
                sf.write(audio_path, wav.data.cpu().numpy(), text2speech.fs, "PCM_16")
                print(f'voice synthesis: {res}')

                # 保存した音声を読み込み
                wave_obj = sa.WaveObject.from_wave_file(audio_path)
                # 再生
                is_talking.value = 1
                play_obj = wave_obj.play()
                play_obj.wait_done()
                is_talking.value = 0
                time.sleep(3)
                rmfile(res_path)

    lip_p.terminate()

5. すべてのプログラム実行

Unityプロジェクトは前回と同様です。 2, 3, 4で変更したプログラムを個別のターミナルで起動します。

python sr/voice_recognize.py
python ttt/text_generation.py
python tts/voice_synthesis.py

Unity画面