さー AI祭りです

もっともっと、OCIでAIを使い倒して課金されまくりましょうね。

ということで、定番の音声ファイルから文字起こしにチャレンジします。


流石に この手のサービスは、有料が多いのですが、OCIでも出来ちゃいます。

ちょっと、課金されるし、結果は、ムズイJSONで出力されるので、整形する必要がありますね。

でも、そこは、さらに AI になんとかしてもらって 良い感じの結果にしてもらっちゃう、AIだのみの方法です。

OCI 音声 AIは、文字列から音声にするのと、音声から文字列にするサービスとなっていて、今回は、音声から文字列にするサービスを使います。

日本語に対応していないのかと思っていたのですが、Open AI の Whisperなるモデルを利用出来るので、マルチリンガルだそうです。

ということで、実装したのが、これです。

音声ファイルをドロップして、待つこと数分から数十分

どうですか。って、元音声ファイルを聞かないとわかりませんよね。

適当にネットに落ちてたサンプルで試したんですが、めっちゃ良い結果でした。

98% ぐらいの完成度でしたね。

しかも、結果をAIに解釈させたので、元の音声よりまともな文章になっています。

これ使えますよマジで

ということで、ソースです。

 

whisper1.py

import streamlit as st

import time
import json

import oci
from oci.config import from_file
from oci.object_storage import ObjectStorageClient
from oci.ai_speech import AIServiceSpeechClient
from oci.ai_speech.models import CreateTranscriptionJobDetails, ObjectListInlineInputLocation, ObjectLocation, OutputLocation, TranscriptionModelDetails
from oci.generative_ai_inference import GenerativeAiInferenceClient
from oci.generative_ai_inference.models import (
    ChatDetails,
    CohereChatRequest,
    OnDemandServingMode
)
from datetime import datetime

config = oci.config.from_file("~/.oci/config", "DEFAULT")

object_storage = ObjectStorageClient(config)
namespace = object_storage.get_namespace().data

COMPARTMENT_ID = "ocid1.compartment.oc1..aaaaaaaamgmw22hogecwqnunirb3urhoger4ihdgoilkdjkv2sabokaq5svc"

config = oci.config.from_file("~/.oci/config", "DEFAULT")

# 音声情報用バケット関連
object_storage = ObjectStorageClient(config)
namespace = object_storage.get_namespace().data
bucketnm="whisper"

# AIサービス音声クライアントを作成
speech_client = AIServiceSpeechClient(config)

# Generative AIクライアントを作成
aiclient = GenerativeAiInferenceClient(config=config)
MODEL_DEF = "cohere.command-a-03-2025"

st.title('Whisper demo🎤')

uploaded_file = st.file_uploader(
    "音声/動画ファイルをドラッグ&ドロップまたは選択してください", type=['mp3', 'wav', 'mp4', 'm4a', 'aac', 'ogg'])

status_placeholder = st.empty()

# ファイルアップロード
def upload_contents_to_bucket(file_content,object_name):

    try:
        res:oci.response.Response = object_storage.put_object(namespace_name=namespace, bucket_name=bucketnm,
                                object_name=object_name, put_object_body=file_content)
        if res.status == 200:
            print(f"File {object_name} uploaded successfully.")
            return True
        else:
            print(f"Failed to upload file {object_name}. Status code: {res.status}")
            return False
    except Exception as e:
        print(f"An error occurred while uploading file {object_name}: {e}")
        return False    
       
# ファイル削除
def delete_file_from_bucket(object_name):

    res:oci.response.Response = object_storage.delete_object(namespace_name=namespace, bucket_name=bucketnm, object_name=object_name)

    if res.status == 204:
        print(f"Object {object_name} deleted successfully.")
        return True
    else:
        print(f"Failed to delete object {object_name}. Status code: {res.status}")
        return False

# 音声からテキストへ変換
def speech2text(object_name, poolingwait=3):

    start_time = datetime.now()

    # ジョブ詳細定義
    create_transcription_job_details : CreateTranscriptionJobDetails = CreateTranscriptionJobDetails(
        compartment_id=COMPARTMENT_ID,
        display_name="PythonSDKSampleTranscriptionJob",
        description="Transcription job created by Python SDK",
        input_location=ObjectListInlineInputLocation(
            location_type="OBJECT_LIST_INLINE_INPUT_LOCATION",
            object_locations=[ObjectLocation(
                namespace_name=namespace,
                bucket_name=bucketnm,
                object_names=[object_name])]),
        output_location=OutputLocation(
            namespace_name=namespace, bucket_name=bucketnm),
        model_details=TranscriptionModelDetails(
            model_type="WHISPER_MEDIUM", language_code="ja")
    )

    # ジョブ作成
    res:oci.response.Response = speech_client.create_transcription_job( create_transcription_job_details)
    tjob : oci.ai_speech.models.TranscriptionJob = res.data

    st.session_state.job_id = tjob.id

    status_message = ""

    try:

        # ジョブ完了ホーリング
        while True:
            time.sleep(10)

            res = speech_client.get_transcription_job(tjob.id)
            tjob = res.data

            current_time = datetime.now()
            processing_time = current_time - start_time
            processing_time_str = str(processing_time).split('.')[0]

            new_status_message = f"Job Status: {tjob.lifecycle_state} - Processing Time: {processing_time_str}"
            if new_status_message != status_message:
                status_placeholder.write("更新中...")
                time.sleep(1)
                status_placeholder.empty()

            status_placeholder.write(new_status_message)
            status_message = new_status_message

            if tjob.lifecycle_state in ["SUCCEEDED"]:
                # 完了
                print("Transcription job finished.")
                res = speech_client.list_transcription_tasks(tjob.id)
                ttasklist: oci.ai_speech.models.TranscriptionTaskCollection = res.data
                tasksummary : oci.ai_speech.models.TranscriptionTaskSummary

                result = []
                for tasksummary in ttasklist.items:
                    res = speech_client.get_transcription_task(tjob.id,tasksummary.id)
                    ttask: oci.ai_speech.models.TranscriptionTask = res.data
                    outputlocation = ttask.output_location
                    outbucketname = outputlocation.bucket_name
                    outnamespace = outputlocation.namespace_name
                    outobjects = outputlocation.object_names
                    for outobject in outobjects:
                        print(f"Output object: {outobject} in bucket {outbucketname} namespace {outnamespace}")

                        res = object_storage.get_object(namespace_name=outnamespace, bucket_name=outbucketname, object_name=outobject)
                        responce : oci.response.Response = res
                        if( responce.status != 200 ):
                            print(f"Failed to get object {outobject}. Status code: {responce.status}")
                            continue
                        outputresut = json.load(responce.data.raw)
                        #結果を削除
                        object_storage.delete_object(namespace_name=outnamespace, bucket_name=outbucketname, object_name=outobject)

                        #結果出力
                        result.append(outputresut)

                return result

            else :
                if tjob.lifecycle_state in ["FAILED", "CANCELING", "CANCELED"]:
                    # 失敗
                    print(f"Transcription job failed with status: {tjob.lifecycle_state}")
                    return None

        return None
    finally:
        print("Transcription job ended.")
        speech_client.delete_transcription_job(tjob.id)

# 結果テキストをAIで、自然言語へ変換する
def parse_talking( textjson ) :

    status_placeholder.write("自然言語へ変換中...")

    msg = "音声から文字列に変換された結果が格納されたJSONデータを自然な言葉に変換して下さい。\n"
    msg += json.dumps(textjson, ensure_ascii=False, indent=2)

    # チャットリクエストの作成
    chat_request = CohereChatRequest(
        message=msg,
        max_tokens=1000,
        temperature=0.7,
        is_echo=True,
        is_stream=False
    )

    # サービングモードの指定(利用モデルIDを正しく指定)
    serving_mode = OnDemandServingMode(
        model_id=MODEL_DEF
    )

    # チャット詳細情報をまとめる
    chat_details = ChatDetails(
        compartment_id=COMPARTMENT_ID,
        chat_request=chat_request,
        serving_mode=serving_mode
    )

    # チャットAPIを呼び出す
    response = aiclient.chat(chat_details)

    # ここに音声からのテキスト変換結果を解析するロジックを追加
    return response.data.chat_response.text


if uploaded_file is not None:

    object_name = uploaded_file.name
    if( upload_contents_to_bucket(uploaded_file.getvalue(),object_name) == False ):
        st.error(f"ファイル '{object_name}' のアップロードに失敗しました。")
    else:
        st.success(f"ファイル '{object_name}' がバケット '{bucketnm}' にアップロードされました!")
        st.session_state.uploaded_file_name = object_name
        st.write("OCI Speechの処理を開始します。")

        with st.spinner('処理中です。しばらくお待ちください...'):
            textjson = speech2text(object_name)
            delete_file_from_bucket(object_name)

            finalresult = parse_talking(textjson)

            st.markdown(finalresult, unsafe_allow_html=True)

else:
    st.write("音声/動画ファイルをアップロードして、処理を開始してください。")
   

 

CompartMentは、自身のに変えて下さい。

オブジェクトストレージを使うので、バケットwhisperを作成しておいて下さい。

で、起動は、

streamlit run whisper1.py --server.port 8501 --server.enableCORS false

です。

当然ながら、streamlit 環境と OCI SDK 環境は、構築して下さい。

Windowsだったら、WSL+Ubuntu24.04 + python3 + OCI CLI + OCI python SDK で、構築出来ますよ。

 

Joomla templates by a4joomla