import streamlit as st
import time
import json
import oci
from oci.config import from_file
from oci.object_storage import ObjectStorageClient
from oci.ai_speech import AIServiceSpeechClient
from oci.ai_speech.models import CreateTranscriptionJobDetails, ObjectListInlineInputLocation, ObjectLocation, OutputLocation, TranscriptionModelDetails
from oci.generative_ai_inference import GenerativeAiInferenceClient
from oci.generative_ai_inference.models import (
ChatDetails,
CohereChatRequest,
OnDemandServingMode
)
from datetime import datetime
config = oci.config.from_file("~/.oci/config", "DEFAULT")
object_storage = ObjectStorageClient(config)
namespace = object_storage.get_namespace().data
COMPARTMENT_ID = "ocid1.compartment.oc1..aaaaaaaamgmw22hogecwqnunirb3urhoger4ihdgoilkdjkv2sabokaq5svc"
config = oci.config.from_file("~/.oci/config", "DEFAULT")
# 音声情報用バケット関連
object_storage = ObjectStorageClient(config)
namespace = object_storage.get_namespace().data
bucketnm="whisper"
# AIサービス音声クライアントを作成
speech_client = AIServiceSpeechClient(config)
# Generative AIクライアントを作成
aiclient = GenerativeAiInferenceClient(config=config)
MODEL_DEF = "cohere.command-a-03-2025"
st.title('Whisper demo🎤')
uploaded_file = st.file_uploader(
"音声/動画ファイルをドラッグ&ドロップまたは選択してください", type=['mp3', 'wav', 'mp4', 'm4a', 'aac', 'ogg'])
status_placeholder = st.empty()
# ファイルアップロード
def upload_contents_to_bucket(file_content,object_name):
try:
res:oci.response.Response = object_storage.put_object(namespace_name=namespace, bucket_name=bucketnm,
object_name=object_name, put_object_body=file_content)
if res.status == 200:
print(f"File {object_name} uploaded successfully.")
return True
else:
print(f"Failed to upload file {object_name}. Status code: {res.status}")
return False
except Exception as e:
print(f"An error occurred while uploading file {object_name}: {e}")
return False
# ファイル削除
def delete_file_from_bucket(object_name):
res:oci.response.Response = object_storage.delete_object(namespace_name=namespace, bucket_name=bucketnm, object_name=object_name)
if res.status == 204:
print(f"Object {object_name} deleted successfully.")
return True
else:
print(f"Failed to delete object {object_name}. Status code: {res.status}")
return False
# 音声からテキストへ変換
def speech2text(object_name, poolingwait=3):
start_time = datetime.now()
# ジョブ詳細定義
create_transcription_job_details : CreateTranscriptionJobDetails = CreateTranscriptionJobDetails(
compartment_id=COMPARTMENT_ID,
display_name="PythonSDKSampleTranscriptionJob",
description="Transcription job created by Python SDK",
input_location=ObjectListInlineInputLocation(
location_type="OBJECT_LIST_INLINE_INPUT_LOCATION",
object_locations=[ObjectLocation(
namespace_name=namespace,
bucket_name=bucketnm,
object_names=[object_name])]),
output_location=OutputLocation(
namespace_name=namespace, bucket_name=bucketnm),
model_details=TranscriptionModelDetails(
model_type="WHISPER_MEDIUM", language_code="ja")
)
# ジョブ作成
res:oci.response.Response = speech_client.create_transcription_job( create_transcription_job_details)
tjob : oci.ai_speech.models.TranscriptionJob = res.data
st.session_state.job_id = tjob.id
status_message = ""
try:
# ジョブ完了ホーリング
while True:
time.sleep(10)
res = speech_client.get_transcription_job(tjob.id)
tjob = res.data
current_time = datetime.now()
processing_time = current_time - start_time
processing_time_str = str(processing_time).split('.')[0]
new_status_message = f"Job Status: {tjob.lifecycle_state} - Processing Time: {processing_time_str}"
if new_status_message != status_message:
status_placeholder.write("更新中...")
time.sleep(1)
status_placeholder.empty()
status_placeholder.write(new_status_message)
status_message = new_status_message
if tjob.lifecycle_state in ["SUCCEEDED"]:
# 完了
print("Transcription job finished.")
res = speech_client.list_transcription_tasks(tjob.id)
ttasklist: oci.ai_speech.models.TranscriptionTaskCollection = res.data
tasksummary : oci.ai_speech.models.TranscriptionTaskSummary
result = []
for tasksummary in ttasklist.items:
res = speech_client.get_transcription_task(tjob.id,tasksummary.id)
ttask: oci.ai_speech.models.TranscriptionTask = res.data
outputlocation = ttask.output_location
outbucketname = outputlocation.bucket_name
outnamespace = outputlocation.namespace_name
outobjects = outputlocation.object_names
for outobject in outobjects:
print(f"Output object: {outobject} in bucket {outbucketname} namespace {outnamespace}")
res = object_storage.get_object(namespace_name=outnamespace, bucket_name=outbucketname, object_name=outobject)
responce : oci.response.Response = res
if( responce.status != 200 ):
print(f"Failed to get object {outobject}. Status code: {responce.status}")
continue
outputresut = json.load(responce.data.raw)
#結果を削除
object_storage.delete_object(namespace_name=outnamespace, bucket_name=outbucketname, object_name=outobject)
#結果出力
result.append(outputresut)
return result
else :
if tjob.lifecycle_state in ["FAILED", "CANCELING", "CANCELED"]:
# 失敗
print(f"Transcription job failed with status: {tjob.lifecycle_state}")
return None
return None
finally:
print("Transcription job ended.")
speech_client.delete_transcription_job(tjob.id)
# 結果テキストをAIで、自然言語へ変換する
def parse_talking( textjson ) :
status_placeholder.write("自然言語へ変換中...")
msg = "音声から文字列に変換された結果が格納されたJSONデータを自然な言葉に変換して下さい。\n"
msg += json.dumps(textjson, ensure_ascii=False, indent=2)
# チャットリクエストの作成
chat_request = CohereChatRequest(
message=msg,
max_tokens=1000,
temperature=0.7,
is_echo=True,
is_stream=False
)
# サービングモードの指定(利用モデルIDを正しく指定)
serving_mode = OnDemandServingMode(
model_id=MODEL_DEF
)
# チャット詳細情報をまとめる
chat_details = ChatDetails(
compartment_id=COMPARTMENT_ID,
chat_request=chat_request,
serving_mode=serving_mode
)
# チャットAPIを呼び出す
response = aiclient.chat(chat_details)
# ここに音声からのテキスト変換結果を解析するロジックを追加
return response.data.chat_response.text
if uploaded_file is not None:
object_name = uploaded_file.name
if( upload_contents_to_bucket(uploaded_file.getvalue(),object_name) == False ):
st.error(f"ファイル '{object_name}' のアップロードに失敗しました。")
else:
st.success(f"ファイル '{object_name}' がバケット '{bucketnm}' にアップロードされました!")
st.session_state.uploaded_file_name = object_name
st.write("OCI Speechの処理を開始します。")
with st.spinner('処理中です。しばらくお待ちください...'):
textjson = speech2text(object_name)
delete_file_from_bucket(object_name)
finalresult = parse_talking(textjson)
st.markdown(finalresult, unsafe_allow_html=True)
else:
st.write("音声/動画ファイルをアップロードして、処理を開始してください。")