
- DatabricksのModel Servingが楽しい・・・
- Vector Search
- Vector Searchを使ってベクトルデータベースを構築
- LLMからVector Indexへのクエリを実行出来るようにする
- PlaygroundでLLMからVector Indexにクエリ実行をさせる
- まとめ
こんにちは、CCCMKホールディングス TECH LABの三浦です。
スーパーに行くのが好きです。季節イベントに関する催し物をやっていることが多くて「もうこんな時期なんだなぁ」と感じることが出来るからです。今はハロウィンの時期ですが、もうおせちの予約の告知もやっていて時の流れの速さを感じました。
DatabricksのModel Servingが楽しい・・・
Azure DatabricksのModel Servingに関連する機能が面白くて最近よく色々試しています。Azureの日本リージョンでは利用できない機能ですが、たとえばAI FunctionはSQLクエリの中でModel Servingで提供されるLLMなどのモデルを呼び出してクエリ検索結果に対してシームレスにモデルを適用し、その結果を取得することが出来ます。
以前検証した時の記事はこちら↓。
LLMを使ったアプリケーションに欠かせないテクニック"Retrieval-Augmented Generation(RAG)"をDatabricksで実装する方法の1つにDatabricksで利用できる"Vector Search"を使う方法があります。元になるドキュメントをDelta Tableとして格納し、Model Serving Endpointに登録した埋め込みモデルを使う、という構成で比較的容易にRAGの要であるベクトルデータベースを構築することが出来ました。
さらにベクトルデータベースに対するクエリ機能もSQL関数で登録することが出来ます。Model ServingのFoundation Model APIで提供されているTool Callingに対応したLLMにその関数をToolとして渡してあげると簡単にRAG Agentを構築することが出来ました。
今回は私が書いたブログ記事をVector Searchでベクトルデータベース化し、Foundation Model APIのLLMから呼び出すところまでを試してみました。
Vector Search
Azure DatabricksにおけるVector Searchは次のドキュメントに詳細が記載されています。
Vector Searchで構築したベクトルデータベースはHybrid Searchを利用して関連データを取得することが出来るようになります。Hybrid Searchはクエリに含まれるキーワードと一致する単語が含まれるデータを取得する処理と埋め込み表現による近似計算によるデータ取得を組み合わせた検索方法です。
またVector Searchではデータベースの構成方法について以下のような3つのオプションを選ぶことが出来ます。
オプション1
元になるテキストデータが格納されたDelta Tableを用意。ベクトルデータベース構築時に各レコードの対象のテキスト項目に対する埋め込み表現を計算し、Unity Catalog上に保存する。オプション2
テキストデータとそれに対する埋め込み表現を事前に計算、両方とも含むDelta Tableを用意し、ベクトルデータベースを構築するオプション3
Delta Tableを用意せずにVector SearchのAPIを通じて直接ベクトルデータベースに格納する
オプション1の場合は元のDelta Tableに変更があった際、それをトリガーに自動的に埋め込み計算を走らせ、ベクトルデータベースを更新する、といったことが可能になります。
Vector Searchを使ってベクトルデータベースを構築
Vector Searchを使ってベクトルデータベースを構築してみました。使用するデータは私が書いたブログの記事からピックアップしました。curlでhtmlをダウンロードして、Pythonのライブラリtrafilaturaでコンテンツを取得します。
Databricksでデータを加工する際によく使われる"メダリオンアーキテクチャ"を意識して、次のような流れでベクトルデータベースを構築しました。

具体的な作業内容について、各パートごとに紹介します。
ブログ⇒テキスト抽出⇒Delta Tableへの格納
ブログからテキストを取得し、そのままDelta Tableに書き込むまでの流れです。DatabricksのNotebookで次のように処理を実行していきました。
必要なライブラリのインストール。
%pip install lxml_html_clean trafilatura dbutils.library.restartPython()
ブログ記事のダウンロード。
%sh mkdir ./download_files curl https://techblog.cccmkhd.co.jp/entry/2024/10/15/123853 -o ./download_files/20241015.html curl https://techblog.cccmkhd.co.jp/entry/2024/10/08/155058 -o ./download_files/20241008.html curl https://techblog.cccmkhd.co.jp/entry/2024/10/01/145518 -o ./download_files/20241001.html
ダウンロードしたHTMLファイルを読み込み、pandas.DataFrameに格納する。
from pathlib import Path import pandas as pd from trafilatura import extract def load_file(path: Path): """HTMLファイルを読みこみ、trafilaturaでテキスト抽出する関数""" with open(path, "r") as f: text_content = f.read() return extract(text_content) load_target_files = Path("./download_files/") text_contens = [load_file(p) for p in load_target_files.glob('*.html')] df = pd.DataFrame({ "text":text_contens })
Delta Tableに書き込む。catalogやschemaは環境に合わせたものを使用してください。
catalog = "catalog_name" schema = "schema_name" table = "blog_raw_table" spark.createDataFrame(df)\ .write.mode("overwrite")\ .saveAsTable(f"{catalog}.{schema}.{table}")
Delta Table⇒テキスト分割(chunking)⇒Delta Tableへの格納
次はブログのテキストを細分化(chunking)したのちDelta Tableへ書き込むまでの処理です。chunkingはLangChainのRecursiveCharacterTextSplitterを使用しました。
まず前のステップで作成したDelta Tableを読み込みます。
# `catalog_name`や`schema_name`は環境に合わせたものを使用してください。 spdf = spark.sql( """ SELECT text FROM catalog_name.schema_name.blog_raw_table """ ) df = spdf.toPandas()
次にRecursiveCharacterTextSplitterでchunkingを実行します。
from langchain_text_splitters import RecursiveCharacterTextSplitter text_splitter = RecursiveCharacterTextSplitter( chunk_size=523, chunk_overlap=20, length_function=len, is_separator_regex=False, ) splitted_page_contents = [] for i,row in df.iterrows(): # 一行ずつ読み込んでtextを分割する splitted_texts = text_splitter.split_text(row.text) for text in splitted_texts: splitted_page_contents.append(text)
Delta Tableに書き込むデータを用意します。行ごとに一意な値を設定するid列をuuid4()で生成しました。
from uuid import uuid4 import pandas as pd df = pd.DataFrame( { "id":[str(uuid4()) for _ in splitted_page_contents], "text": splitted_page_contents, } )
Delta Tableに書き込みを行います。
catalog = "catalog_name" schema = "schema_name" table = "blog_chunked_table" spark.createDataFrame(df)\ .write\ .mode("overwrite")\ .saveAsTable(f"{catalog}.{schema}.{table}")
最後にALTER TABLEでテーブルのプロパティdelta.enableChangeDataFeedをtrueに設定しました。これはこれから作成するVector Indexをこのテーブルと同期させるために必要になる設定のようです。
spark.sql(
"""
ALTER TABLE catalog_name.schema_name.blog_chunked_table \
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
"""
)
埋め込みモデルのModel Serving Endpointの作成
Vector Index作成時、テキストに対する埋め込み表現を計算するための埋め込みモデルをModel Servingから利用できるようにEndpointを作成します。
こちらはDatabricks WorkspaceのGUIで作業を進めました。今回はAzure OpenAI Serviceの"text-embedding-3-large"を外部モデルとしてEndpointに登録してみました。
手順はWorkspaceの"Serving"から"Create serving endpoint"に進み、以下の様に設定を入力します。

完了すると、Serving endpointsの一覧に追加されます。

Delta Table⇒埋め込み⇒Vector Index
Vector Searchを使ってベクトルデータベース(Vector Index)の作成を行います。Vector SearchはPythonのSDKを使って利用することが出来ます。
最初にVector SearchのSDKをインストールします。
%pip install databricks-vectorsearch dbutils.library.restartPython()
Vector IndexにはEndpointを通じてアクセスすることになるので、次のコードで"blog_vector_search"という名称のEndpointを作成します。
from databricks.vector_search.client import VectorSearchClient client = VectorSearchClient() client.create_endpoint( name="blog_vector_search", endpoint_type="STANDARD" )
次のコードでVector Indexを作成します。
index = client.create_delta_sync_index(
endpoint_name="blog_vector_search",
source_table_name="catalog_name.schema_name.blog_chunked_table",
index_name="catalog_name.schema_name.blog_chunked_table_index",
pipeline_type="TRIGGERED",
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name="text-embedding-3-large"
)
作成時のオプションは以下の通りです。
endpoint_name: Endpointの名前。source_table_name: 埋め込み元のDelta Table名。index_name: 作成するVector Index名。Unity CatalogのTableとして作成されます。pipeline_type: 埋め込み元のTableとの同期モード。"TRIGGERED"は明示的に同期を実行する必要があります。primary_key: Vector Indexのprimary_keyとして使用するカラムembedding_source_column: 埋め込み対象のカラムembedding_model_endpoint_name: 使用する埋め込みモデルのModel Serving Endpoint名
作成には少し時間がかかりました。作成を開始するとWorkspaceのCatalogのTablesに作成中のIndexが追加されます。作成中はUpdate statusを見て進行状態を確認することが出来ました。
PythonSDKを使ってVector Indexにクエリを実行してみました。
result = index.similarity_search(
query_text="Foundation Model API",
columns=["id", "text"],
num_results=1,
query_type="hybrid"
)
print(result)
SQLでもクエリを実行することが出来ます。その際はvector_searchというSQL関数を使用します。
SELECT text FROM VECTOR_SEARCH( index => "catalog_name.schema_name.blog_chunked_table_index", query => "Foundation Model APIについて", num_results => 2 )
WorkspaceのSQL Editorで実行すると、結果が得られました。

LLMからVector Indexへのクエリを実行出来るようにする
Vector Indexが出来上がったので、今度はLLMからクエリを実行できるよう、クエリ処理をTool化しました。コードを実行して登録するのですが、実行するComputerのタイプによっては以下のエラーが発生します。メッセージにあるように、SQL Serverless上で実行しました。
[UNSUPPORTED_FEATURE.AI_FUNCTION] The feature is not supported: AI function vector_search is only available in Interactive Workloads, Jobs, SQL Pro and SQL Serverless, or it's disabled explicitly. SQLSTATE: 0A000
コードはこちらです。Vector Indexと同じCatalog, SchemaのFunctionsに登録しました。
CREATE OR REPLACE FUNCTION catalog_name.schema_name.get_blog_contents (query STRING) RETURNS TABLE(answer STRING) COMMENT '与えられたクエリに関係する、テックブログの記事の内容のリストを返す関数です。' LANGUAGE SQL RETURN SELECT text as answer FROM VECTOR_SEARCH( index => "catalog_name.schema_name.blog_chunked_table_index", query => query, num_results => 5 )
PlaygroundでLLMからVector Indexにクエリ実行をさせる
最後に今登録したToolをLLMに与え、質問内容に応じてVector Indexにクエリを実行する、Agentの動作を確認してみました。とりあえず動作を試すのであれば、WorkspaceのPlaygroundを使うのが早いです。
PlaygroundでTool Callingに対応したLLM, たとえば"databricks-meta-llama-3-1-70b-instruct"を選択し、"Tools"から"Add tool"をクリック、セレクトボックスに先ほど追加したToolが表示されるので選択します。

あとはチャット形式で質問を入力すると、LLMが判断して必要に応じてToolを経由してVector Indexにクエリを実行し、関連する情報を参照した回答を生成出来ることを確認することが出来ます。


まとめ
今回はDatabricksのVector Searchを使ってベクトルデータベース(Vector Index)の構築、そしてLLMから呼び出してRAG Agentを作るところまで試してみました。ところどころ詰まるところもありましたが、一回やってみれば今後はかなりスピーディーにRAG Agentを作ることが出来そうです。引き続き色々Model Serving周りを試してみたいな、と思います!