OCIで AI エージェントが使えるようになりました。
ていうか、AI エージェントって何なんでしょうね。
どうも LLMをインターフェースとして色々な機能を盛っていくような感じですかね。
例えば、RAGなんかは、知識を与えておいて、この知識に対してLLMが回答したり考察してくれたりします。
LLMに対する質問をSQLに変換したりも可能なようです。
REST API をコールすることも出来そうですね。
ちょっと、試したんですが、関数を定義して、この関数を呼び出して結果に対して、LLMが何かするみたいなことも出来るようです。
まだ、出たばっかりだし、謎の機能だらけなんで、どう使えば良いかは、わかりません。
RAGであれば使えそうなんで、とりあえず、これを試してみましょう。
まず、ナレッジを作らないとダメなんですが、無くてもエージェントは作れるので、がわだけを作りますね。
ocicliとstreamlit が使える環境用です。
起動は、こんな感じです。
streamlit run chat1.py --server.port 8501 --server.enableCORS false
chat.py
import streamlit as st
import json
import oci
from oci.generative_ai_agent_runtime.models import(
CreateSessionDetails,
ChatDetails,
ChatResult,
UpdateSessionDetails,
RequiredAction,
FunctionCallingRequiredAction,
FunctionCallingPerformedAction,
PerformedAction
)
from agentai import agentai
# テーマの取得
theme = "dark" if st.config.get_option("theme.base") == "dark" else "light"
# モバイル表示の問題を修正
# テーマに応じたCSSを適用
st.markdown(f"""
<style>
@media (max-width: 800px) {{
.stChatInput {{
position: fixed;
bottom: 0;
left: 0;
width: 100%;
padding: 10px;
z-index: 1000;
transition: background-color 0.3s ease;
}}
.stChatInput textarea {{
width: 100%;
box-sizing: border-box;
}}
/* Lightモードのスタイル */
.{theme}-mode .stChatInput {{
background-color: #ffffff;
border-top: 1px solid #ccc;
box-shadow: 0 -2px 5px rgba(0, 0, 0, 0.1);
}}
/* Darkモードのスタイル */
.dark-mode .stChatInput {{
background-color: #1e1e1e;
border-top: 1px solid #444;
box-shadow: 0 -2px 5px rgba(0, 0, 0, 0.5);
}}
.dark-mode .stChatInput textarea {{
color: #ffffff;
background-color: #1e1e1e;
}}
}}
</style>
""", unsafe_allow_html=True)
# OCI設定
config = oci.config.from_file("~/.oci/config", "DEFAULT")
COMPARTMENT_ID = "ocid1.compartment.oc1..aaaaaaaamgmw22hogecwqnunirb3urhoger4ihdgoilkdjkv2sabokaq5svc"
#
# セッション状態の初期化
#
if 'current_agent' not in st.session_state:
st.session_state.current_agent = agentai( config=config, compid=COMPARTMENT_ID)
# エージェントリスト
available_agents = st.session_state.current_agent.list_aegent()
#タイトル
st.title("OCI AI Agent V1.0")
if len(available_agents) == 0:
st.error("エージェントが見つかりません。OCI Generative AI Agentを作成してください。")
st.stop()
# エージェント選択のドロップダウン(オプション)
selected_agent = st.sidebar.selectbox(
"エージェント",
available_agents,
index=0,
format_func = lambda item: f"{item[1]}",
key="agent_select_box"
)
agent_id = selected_agent[0]
agent_display = selected_agent[1]
change_agent = False
agent:agentai = st.session_state.current_agent
if agent.agent_id is None:
change_agent = True
else:
if agent.agent_id != agent_id:
agent.delete_session()
change_agent = True
# エンドポイントリスト
change_endpoint = False
endpoints = agent.list_agent_endpoints(agent_id)
if len(endpoints) == 0:
st.error("エンドポイントが見つかりません。OCI Generative AI Agentのエンドポイントを作成してください。")
st.stop()
selected_endpoint = st.sidebar.selectbox(
"エンドポイント",
endpoints,
index=0,
format_func = lambda item: f"{item[1]}",
key="endpoint_select_box"
)
endpoint_id = selected_endpoint[0]
endpoint_display = selected_endpoint[1]
if agent.agentendpoint_id is None:
change_endpoint = True
else:
if agent.agentendpoint_id != endpoint_id :
change_endpoint = True
# チャット履歴の初期化
if "messages" not in st.session_state:
st.session_state.messages = []
# エージェントとエンドポイントの変更があった場合、セッション状態をリセット
if change_agent or change_endpoint:
st.session_state.messages = []
session = agent.create_session(agent_id, endpoint_id)
# アシスタントからの挨拶
st.markdown(session.welcome_message)
print(f"セッション作成 エージェント: {agent_display} エンドポイント: {endpoint_display}")
# チャット履歴の表示
for message in st.session_state.messages:
role = "assistant" if message["role"] == "assistant" else "user"
with st.chat_message(role):
st.markdown(message["message"])
# ユーザー入力
if prompt := st.chat_input("ここにメッセージを入力してください..."):
print(f"セッションID: {agent.session_id}")
# ユーザーメッセージを履歴に追加
st.session_state.messages.append({"role": "user", "message": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("思考中..."):
bot_reply = ""
chat_history = []
for message in st.session_state.messages:
if message["role"] == "user":
chat_history.append({"role": "USER", "message": message["message"]})
elif message["role"] == "assistant":
chat_history.append({"role": "CHATBOT", "message": message["message"]})
# エージェントにプロンプトを送信
chatresult:ChatResult = agent.chat(prompt)
if chatresult.required_actions:
print("\n--- エージェントからのアクション要求 ---")
action:RequiredAction
for action in chatresult.required_actions:
if action.required_action_type == RequiredAction.REQUIRED_ACTION_TYPE_FUNCTION_CALLING_REQUIRED_ACTION:
fcaction:FunctionCallingRequiredAction = action
function_call = fcaction.function_call
function_name = function_call.name
function_arguments_str = function_call.arguments # 引数は文字列として渡されます
swagger = function_call.swagger_types
print(f"関数呼び出しが要求されました: {action.action_id}")
print(f" 呼び出し関数名: {function_name}")
print(f" 引数: {function_arguments_str}")
print(f" swagger: {swagger}")
# 引数文字列をJSONとしてパース
if function_name == "hogehoge":
try:
function_arguments = json.loads(function_arguments_str)
keyword_decode = function_arguments['keyword']
print(f" 関数引数 (keyword): {keyword_decode}")
result = "hogehoge の 結果です。"
print(f" hogehoge: {result}")
chatresult:ChatResult = agent.chatFunctionResponse(f"result",action.action_id,result)
print(chatresult)
except json.JSONDecodeError:
print(f" エラー: 関数引数 '{function_arguments_str}' が有効なJSONではありません。")
function_arguments = {} # エラー時は空の辞書を割り当て
if chatresult.message is None:
trace:oci.generative_ai_agent_runtime.models.Trace
for trace in chatresult.traces:
if trace.trace_type == oci.generative_ai_agent_runtime.models.Trace.TRACE_TYPE_GENERATION_TRACE :
generation:oci.generative_ai_agent_runtime.models.GenerationTrace = trace
bot_reply = generation.generation
else:
print(f"{chatresult.message.time_created}:{chatresult.message.role}:{chatresult.message.content.text}")
bot_reply = chatresult.message.content.text
if bot_reply:
# アシスタントの応答を履歴に追加
st.session_state.messages.append({"role": "assistant", "message": bot_reply})
st.markdown(bot_reply)
agentai.py
import threading
import datetime
import time
import oci
from oci.generative_ai_agent import GenerativeAiAgentClient
from oci.generative_ai_agent_runtime import GenerativeAiAgentRuntimeClient
from oci.generative_ai_agent_runtime.models import(
CreateSessionDetails,
ChatDetails,
ChatResult,
UpdateSessionDetails,
FunctionCallingRequiredAction,
FunctionCallingPerformedAction,
PerformedAction
)
#
# エージェントAI
#
class agentai:
def __init__(self, config, compid):
self.compartment_id = compid
self.agent_id = None
self.client = GenerativeAiAgentClient(config)
self.runtime_client = GenerativeAiAgentRuntimeClient(config)
self.session_id = None
self.agentendpoint_id = None
self.thread = None
self.last_action_time = datetime.datetime.now()
#
# エージェントリスト
#
def list_aegent(self):
# エージェントリスト
available_agents = []
ret = self.client.list_agents(compartment_id=self.compartment_id)
agentmodels:oci.generative_ai_agent.models.AgentCollection = ret.data
agentmodel:oci.generative_ai_agent.models.Agent
for agentmodel in agentmodels.items:
if agentmodel.lifecycle_state == oci.generative_ai_agent.models.Agent.LIFECYCLE_STATE_ACTIVE :
agenttuple = [agentmodel.id, agentmodel.display_name]
available_agents.append(agenttuple)
return available_agents
#
# エージェントエンドポイントリスト
#
def list_agent_endpoints(self, agentid):
"""エージェントエンドポイントリストを取得"""
available_agentendpoints = []
ret = self.client.list_agent_endpoints(
compartment_id=self.compartment_id,
agent_id=agentid)
agentendpoints:oci.generative_ai_agent.models.AgentEndpointCollection = ret.data
for agentendpoint in agentendpoints.items:
if agentendpoint.lifecycle_state == oci.generative_ai_agent.models.AgentEndpoint.LIFECYCLE_STATE_ACTIVE :
agentendpointtuple = [agentendpoint.id, agentendpoint.display_name]
available_agentendpoints.append(agentendpointtuple)
return available_agentendpoints
#
# セッションキープアライブ
#
def _keep_alive(self ):
"""1分毎にセッションキープリクエストを送信"""
mysession_id = self.session_id
while mysession_id == self.session_id:
# タイムアウト判定
current_time = datetime.datetime.now()
time_difference = current_time - self.last_action_time
seconds = time_difference.total_seconds()
if seconds > 60*60*2 :
print(f"Timeout: {mysession_id},{self.last_action_time.strftime("%Y-%m-%d %H:%M:%S")}")
break
# 1分待機
time.sleep(60)
try:
# バックエンドサーバーにセッションキープリクエストを送信
self.update_session()
except Exception as e:
print(f"Session keep-alive error: {type(e)} -> {e}")
print(f"keep alive: {mysession_id},{self.last_action_time.strftime("%Y-%m-%d %H:%M:%S")}")
#
# セッション作成
#
def create_session(self, agentid, endpointid):
try:
if self.session_id is not None :
self.delete_session()
except Exception as e:
pass
self.agent_id = agentid
self.agentendpoint_id = endpointid
# セッション作成
ret = self.runtime_client.create_session(
create_session_details=CreateSessionDetails(display_name="A",description="AA"),
agent_endpoint_id = self.agentendpoint_id )
session:oci.generative_ai_agent_runtime.models.Session = ret.data
self.session_id = session.id
self.last_action_time = datetime.datetime.now()
self.thread = threading.Thread(target=self._keep_alive, daemon=True)
self.thread.start()
return session
#
# チャット
#
def chat(self, message, shouldstream = False):
self.last_action_time = datetime.datetime.now()
# チャット
ret = self.runtime_client.chat(
agent_endpoint_id=self.agentendpoint_id,
chat_details=ChatDetails(
user_message = message,
should_stream = shouldstream,
session_id = self.session_id
))
self.last_action_time = datetime.datetime.now()
chatresult:ChatResult = ret.data
return chatresult
#
# 関数応答
#
def chatFunctionResponse(self, message, action_id, result):
self.last_action_time = datetime.datetime.now()
ret = self.runtime_client.chat(
agent_endpoint_id=self.agentendpoint_id,
chat_details=ChatDetails(
user_message = message,
should_stream = False,
session_id = self.session_id,
performed_actions = [FunctionCallingPerformedAction(
action_id = action_id,
performed_action_type = PerformedAction.PERFORMED_ACTION_TYPE_FUNCTION_CALLING_PERFORMED_ACTION,
function_call_output =f"{result}")]
) )
self.last_action_time = datetime.datetime.now()
chatresult:ChatResult = ret.data
return chatresult
#
# セッション更新
#
def update_session(self):
self.runtime_client.update_session(
agent_endpoint_id=self.agentendpoint_id,
session_id = self.session_id,
update_session_details = UpdateSessionDetails(display_name="A",description="AA") )
#
# セッション削除
#
def delete_session(self):
ret = self.runtime_client.delete_session(
agent_endpoint_id=self.agentendpoint_id,
session_id=self.session_id)
self.session_id = None
self.agentendpoint_id = None
これで、チャットが出来るので、後は、だらだらと拡張していけば良いでしょ
次に、RAG用のナレッジ作りですけど、こいつ、pdfとtxtしか対応していないらしいです。
いや、お仕事環境だと、WordとかExcelとかPowerPointがメインじゃんか
どうすんのってことで、変換して、良い感じにオブジェクトストレージにぶち込むスクリプトを作成しました。
convert2pdf.py
import subprocess
import os
def convert_to_pdf(input_file: str, output_dir: str):
"""
LibreOfficeを使用してオフィスファイルをPDFに変換します。
"""
# 入力ファイルが存在するかチェック
if not os.path.exists(input_file):
print(f"エラー: ファイルが見つかりません - {input_file}")
return
# コマンドをリスト形式で準備
command = [
"libreoffice",
"--headless",
"--convert-to",
"pdf",
input_file,
"--outdir",
output_dir
]
print(f"実行中: {' '.join(command)}")
try:
# サブプロセスとしてコマンドを実行
result = subprocess.run(
command,
check=True, # コマンドが失敗した場合に例外を発生させる
capture_output=True, # 標準出力と標準エラーをキャプチャする
text=True # 出力をテキストとしてデコードする
)
print("✅ 変換に成功しました!")
print(f"出力先: {output_dir}")
# LibreOfficeからの詳細出力を表示したい場合
# print("--- LibreOffice Output ---")
# print(result.stdout)
except FileNotFoundError:
print("エラー: 'libreoffice' コマンドが見つかりません。")
print("LibreOfficeがインストールされ、PATHが通っているか確認してください。")
except subprocess.CalledProcessError as e:
print(f"❌ 変換中にエラーが発生しました。")
print(f"リターンコード: {e.returncode}")
print(f"エラー出力:\n{e.stderr}")
smbwalk.py
#!/usr/bin/python3
#!/usr/bin/env python3
import os
from smb.SMBConnection import SMBConnection
from smb.smb_structs import OperationFailure
class smbwalk:
server = None
def __init__(self, user, password, mymachinename, server):
self.conn = SMBConnection(user, password, mymachinename, server, use_ntlm_v2=True)
self.server = server
def __del__(self):
try:
if self.conn:
self.conn.close()
except Exception as e:
pass
def connect(self, port = 445):
try:
return self.conn.connect(self.server, port)
except Exception as e:
print(f"接続エラー: {e}")
return False
def walk(self,shared_folder,current_path, indent=0, callback=None):
try:
# 現在のパスにあるファイル/ディレクトリ一覧を取得
file_list = self.conn.listPath(shared_folder, current_path)
for f in file_list:
# '.' と '..' は無視する
if f.filename in ['.', '..']:
continue
# パスを結合
next_path = os.path.join(current_path, f.filename)
if f.isDirectory:
if callback != None and callback( self, shared_folder, f, next_path, indent) == False:
return
# ディレクトリなら再帰呼び出し
self.walk(shared_folder, next_path, indent=indent + 1, callback=callback)
else:
if callback != None and callback( self, shared_folder, f, next_path, indent) == False:
return
except OperationFailure as e:
print(f"エラー: パス '{current_path}' のリスト取得に失敗しました。{e}")
except Exception as e:
print(f"予期せぬエラーが発生しました: {e}")
def download(self, shared_folder, remote_path, local_path):
"""
SMBサーバーからファイルをローカルにダウンロードします。
Args:
shared_folder (str): 共有フォルダ名。
remote_path (str): 共有フォルダ内でのリモートファイルのパス。
local_path (str): ダウンロード先のローカルファイルのパス。
Returns:
bool: 成功した場合True、失敗した場合False。
"""
try:
# ローカルディレクトリが存在しない場合は作成
local_dir = os.path.dirname(local_path)
if local_dir and not os.path.exists(local_dir):
os.makedirs(local_dir)
print(f"ディレクトリを作成しました: {local_dir}")
with open(local_path, 'wb') as fp:
file_attributes, filesize = self.conn.retrieveFile(
shared_folder, remote_path, fp
)
print(f"ダウンロード完了: {remote_path} -> {local_path} ({filesize} bytes)")
return True
except OperationFailure as e:
print(f"ダウンロードエラー: {remote_path} のダウンロードに失敗しました。{e}")
return False
except Exception as e:
print(f"予期せぬダウンロードエラー: {e}")
return False
objset.py
#!/usr/bin/python3
import io
import oci
class MultiUploader:
totallen = 0
buffer_size = 1024 * 1024 * 50
def __init__(self,namespace,config):
self.namespace = namespace
self.objstragecl:oci.object_storage.ObjectStorageClient = oci.object_storage.ObjectStorageClient(config)
None
#サイズを良い感じに表現する
def format_size(self, totallen):
# 単位の定義
units = ["Bytes", "KB", "MB", "GB", "TB"]
size = totallen
unit_index = 0
# 適切な単位を見つける
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
# 結果を文字列でフォーマット
return f"{size:.2f} {units[unit_index]}"
#非同期Upload
def _async_task(self,chunk:bytes, mudata:oci.object_storage.models.MultipartUpload, part_details_list:list, bucket, objectname):
partnum = 0
partnum=len(part_details_list)+1
# パートに出力する
part_details:oci.response.Response = self.objstragecl.upload_part( namespace_name=self.namespace,bucket_name=bucket, object_name=objectname,
upload_id=mudata.upload_id,
upload_part_num=partnum,
upload_part_body=io.BytesIO(chunk) )
part_details_list.append(oci.object_storage.models.CommitMultipartUploadPartDetails(
part_num=partnum,
etag=part_details.headers["ETag"]
))
self.totallen=self.totallen+len(chunk)
print("upload part {0}:{1}".format( len(part_details_list), self.format_size(self.totallen)) )
#マルチパートUpload
def upload( self, bucket, objectname,contents:bytes ):
binstream = io.BytesIO(contents)
if len(contents) > 1024*1024*100:
#バイナリは、巨大ファイル対応としておく
res:oci.response.Response = self.objstragecl.create_multipart_upload(namespace_name=self.namespace,bucket_name=bucket,
create_multipart_upload_details=oci.object_storage.models.CreateMultipartUploadDetails(
object = objectname
) )
mudata: oci.object_storage.models.MultipartUpload = res.data
part_details_list = []
tasks = []
while True:
# 標準入力 読み込む
chunk:bytes = binstream.read(self.buffer_size)
if not chunk:
break
# パートに出力する
task = self._async_task(chunk, mudata, part_details_list, bucket, objectname )
tasks.append(task)
#コミット
self.objstragecl.commit_multipart_upload(namespace_name=self.namespace,bucket_name=bucket, object_name=objectname,upload_id=mudata.upload_id,
commit_multipart_upload_details=oci.object_storage.models.CommitMultipartUploadDetails(
parts_to_commit=part_details_list) )
else:
res:oci.response.Response = self.objstragecl.put_object(namespace_name=self.namespace,bucket_name=bucket, object_name=objectname,
put_object_body = contents)
buildknowledge.py
#!/usr/bin/env python3
import os
import argparse
import oci
from smbwalk import smbwalk
import convert2pdf
import objset
# --- 接続情報 ---
USER_ID = 'user1'
PASSWORD = 'passwd'
CLIENT_MACHINE_NAME = 'ocicli'
SERVER_NAME = 'windowsserver'
SHARED_FOLDER = 'document'
START_FOLDER= '/'
# 格納先
BUCKET="knowledge"
output_dir = '/tmp/pdf_output'
NAMESPACE = "ocinamespace"
config = oci.config.from_file("~/.oci/config", "DEFAULT")
objuploader = objset.MultiUploader(NAMESPACE,config)
def callback_walk(smbwalk , shared_folder, f, fullpath, indent):
if f.isDirectory == False:
base,ext = os.path.splitext(f.filename)
if ext.lower() in ['.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx']:
print(f"{indent} {f.filename} (変換中...)")
# 変換ディレクトリ
os.makedirs(output_dir, exist_ok=True)
# ダウンロード
localpath = os.path.join(output_dir, f.filename)
if smbwalk.download(shared_folder,fullpath,localpath ) :
convert2pdf.convert_to_pdf( localpath , output_dir)
pdffilename = base + "." + "pdf"
pdflocalfilepath = os.path.join(output_dir, pdffilename)
with open(pdflocalfilepath, mode="rb") as file:
# ファイルの内容をバイト列として読み込む
contentsbin = file.read()
parentdir = os.path.dirname(fullpath)
destpath = os.path.join(parentdir,pdffilename)
objuploader.upload( BUCKET, destpath, contentsbin )
# 作業ファイル削除
try :
os.remove(localpath)
outputfile = os.path.join(output_dir, pdffilename)
os.remove(outputfile)
except Exception as e:
pass
if ext.lower() in ['.txt','.pdf']:
# ダウンロード
localpath = os.path.join(output_dir, f.filename)
if smbwalk.download(shared_folder,fullpath,localpath ) :
with open(localpath, mode="rb") as file:
# ファイルの内容をバイト列として読み込む
contentsbin = file.read()
parentdir = os.path.dirname(fullpath)
destpath = os.path.join(parentdir,f.filename)
objuploader.upload( BUCKET, destpath, contentsbin )
# 作業ファイル削除
try :
os.remove(localpath)
except Exception as e:
pass
print(f"{indent}:{f.filename}")
return True
DEBUG = False
# オプション処理
parser = argparse.ArgumentParser(description='using \n \nex)\n buildknowledge.py smbuser smbpassword smbserver smbshare bucket [--startfolder /hoge/fuga]')
parser.add_argument('smbuser', help='smbuser')
parser.add_argument('smbpassword', help='smbpassword')
parser.add_argument('smbserver', help='smbserver')
parser.add_argument('smbshare', help='smbshare')
parser.add_argument('bucket', help='bucket')
parser.add_argument('--startfolder', help='開始フォルダ')
if( DEBUG == False):
args = parser.parse_args()
USER_ID = args.smbuser
PASSWORD = args.smbpassword
SERVER_NAME = args.smbserver
SHARED_FOLDER = args.smbshare
BUCKET = args.bucket
if args.startfolder:
START_FOLDER = args.startfolder
# Windows共有からファイル探索
smbwalk = smbwalk(USER_ID, PASSWORD, CLIENT_MACHINE_NAME, SERVER_NAME)
is_connected = smbwalk.connect()
if is_connected:
print(f"サーバー '{SERVER_NAME}' への接続に成功しました。")
print(f"\n--- 共有フォルダ '{SHARED_FOLDER}' ---")
# 探索開始
smbwalk.walk(SHARED_FOLDER, START_FOLDER,
callback=callback_walk)
else:
print(f"サーバー '{SERVER_NAME}' への接続に失敗しました。")
buildknowledge.py の NAMESPACEだけ、自分とこの NAMESPACEにしておいて下さい。
ocicliが使える環境用です。
後、PDFに変換する為に、libreofficeを使います。
こんな感じで、パッケージをぶち込むんですよ。
apt install unoconv pdftk-java fonts-ipafont-gothic fonts-ipafont-mincho
それと、日本語PDFへの変換は、めっちゃ Windowsのフォントが必要なんで、WindowsからFontをぱくってきて、Linuxに入れて下さい。
Windowsのフォントのありか
%SystemRoot%\Fonts
Linuxのフォントのありか
/usr/share/fonts
普通にファイル達をディレクトリ毎コピーすれば良いですよ
入れた後に
fc-cache -f -v
で、フォントを認識させればOK
使い方は、こんな感じです。
./buildknowledge.py WindowsUser WindowsPassword windowsserver Document OCIBucket --startfolder "/大事なとこ"
これで、指定したバケットに、WordとかExcelのファイルがpdfに変換されてぶち込まれます
出来たバケットをエージェント AI のナレッジベースに指定すれば、AI様が 賢くなって ありがたきお言葉を頂けるようになります。
これで、自分だけの最強 AI 様を作ってみましょう。