Vポイントマーケティング|TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

LangGraphでAzure DatabricksのTableに自然言語で問い合わせが出来るアプリを作ってみました!

はじめに

こんにちは、CCCMKホールディングス三浦です。

この前の土日、東京は桜がちょうどきれいに咲いていて、散歩をしながらお花見が出来ました。自分にとってはなんとなく桜を見ることが、一年の節目になってるなぁと感じます。

最近Azure東日本リージョンのDatabricksでModel Servingという機能が使えるようになった、というニュースを知りました。海外のリージョンでは以前から使えていましたが、ついに日本でも利用出来るようになってとてもうれしいです。

DatabricksのModel Servingを利用すると、機械学習モデルやLangChainやLangGraphで構築したAgentの推論機能を提供する独自のREST APIを提供できるようになります。色々なサービスにAI機能を連携させることが出来るようになり、AIの活用が促進出来そうです。

そんなModel Servingの利用を視野に入れ、最近LLM Agent開発フレームワーク"LangGraph"を使ってDatabricks Unity CatalogのTableのデータを自然言語で参照出来るアプリを実験的に作っていました。しかし、残念ながら今回作ったAgentアプリはModel Servingでは動かすことが出来ません。こちらのDatabricks Communityにあるように、Tableにクエリを実行する際に必要になるSpark Sessionが、Model Serving内では有効化されていないためです。

Model Servingで動かすことは出来なかったものの、面白いアプリが出来たのでぜひご紹介させてください!

やりたいこと

ユーザーが自然言語でアプリに質問を入力すると、その質問に回答するために必要な情報をTableから抜き出すSQLクエリをLLMが自動生成し、SQLクエリを実行します。得られた結果を参照してLLMが回答を生成する、という流れを実現します。質問によってはTableの情報が不要な場合もあり、その場合はそのままLLMに回答をさせる処理に流れるようにします。

まとめると、以下の図のようになり、これをLangGraphで実装していきました。

アプリの処理の全体図

使用したデータ

今回この検証で使用したデータはHugging Faceに公開されているこちらの音楽に関するデータセットです。

huggingface.co

利用したLLM

DatabricksのModel Servingでは、予め推論処理に最適化されたいくつかのLLMが"Foundation Model APIs"として提供されています。今回はこの中から"databricks-meta-llama-3-3-70b-instruct"というLLMを使用しました。

データの準備

LangGraphでアプリを作る前に、参照するTableを作成しました。次の手順で実施しました。

データセットのダウンロード

次のコードを実行します。

from datasets import load_dataset

ds = load_dataset("maharshipandya/spotify-tracks-dataset")

このデータセットは各楽曲の色々な属性情報が含まれていて、見ていると面白いです。

楽曲の"energy"と"loudness"との関係

Tableへの書き込み

ダウンロードしたデータセットをTableに書き込みます。Unity CatalogのTableに書き込むので、Catalog, Schema, Table名を指定して書き込みます。

spark.createDataFrame(ds["train"].to_pandas().drop("Unnamed: 0",axis=1))\
    .write.format("delta")\
    .mode("overwrite")\
    .saveAsTable(f"{catalog_name}.{schema_name}.`{table_name}`")

COMMENTの付与

後々LLMにSQLを生成させる際に、対象のTableにどんなデータが格納され、どういったカラムで構成されているのかの情報が必要になります。これらの情報はCOMMENTとしてTableとカラムそれぞれに付与してみました。

COMMENTを付けておくとLLMはもちろん、人にとってもデータの内容がすぐに分かるようになるので、意識して付けるようにしていきたいです。

COMMENTを付けておくとこのように確認出来ます。

COMMENTは画面上で編集することが出来ますが、次のようにコマンドで付与することも可能です。COMMENTとして設定した内容は、今回使用しているデータセット"maharshipandya/spotify-tracks-dataset"のDataset Cardに記載されている内容です。

TableのCOMMENT設定のコマンドです。COMMENT ON TABLEというコマンドを使用しました。

COMMENT ON TABLE catalog_name.schema_name.table_name IS 'This is a dataset of Spotify tracks over a range of 125 different genres. Each track has some audio features associated with it.';

各カラムに対するCOMMENTの設定です。こちらはALTER TABLEで行いました。

ALTER TABLE catalog_name.schema_name.table_name
ALTER COLUMN track_id COMMENT 'The Spotify ID for the track';

ALTER TABLE catalog_name.schema_name.table_name
ALTER COLUMN artists COMMENT 'The artists names who performed the track. If there is more than one artist, they are separated by a ;';
...

アプリの構築

Tableの準備が整ったので、LangGraphでアプリの構築を行います。最初に掲載したアプリの全体像の図の、各パーツをLangGraphの"Node"で実装していく流れになります。

LLMとアプリのState

まず、アプリ全体で使用するLLMの設定と、状態を管理するStateクラスを設定します。

llm_endpoint = "databricks-meta-llama-3-3-70b-instruct"
llm = ChatDatabricks(endpoint=llm_endpoint)

class State(TypedDict):
    messages: Annotated[list, add_messages]
    sql: str
    query_result: str

rooting

まずユーザーの質問に対し、次に実行するNodeを決定する"rooting"のNodeです。Tableの情報が必要な場合はSQLクエリを生成する"create_sql"に、必要ない場合はLLMで回答を生成する"normal_chat"のNodeに流れるようにします。

このNodeではLLMが次のNodeの名前以外を生成することは望ましくないため、"Structured Output"を利用して出力の制御をします。出力形式をTypedDictを継承したクラスで定義し、LangChainのLLMで使えるwith_structured_outputメソッドで渡してあげるとStructured Outputを実現することが出来ます。

class Rooting(TypedDict):
    direction: Literal["create_sql", "normal_chat"]

def rooting(state: State):
    """StructuredOutputでクエリに対してDB問い合わせが必要かどうか判断する"""
    query = state["messages"][-1].content
    llm_with_structured = llm.with_structured_output(Rooting)
    prompt = \
    f"""与えられた質問に対し、音楽作品やアーティストの参照が必要な場合は"create_sql"を、それ以外は"normal_chat"を出力してください。
    
    質問: {query}"""
    output = llm_with_structured.invoke(prompt)

    if output["direction"] == "create_sql":
        # SQLクエリ生成に進む
        return Command(goto="create_sql")
    else:
        # LLMによる回答生成に進む
        return Command(goto="normal_chat")

normal_chat

"rooting"の遷移先で、Tableの参照を必要としない場合に実行されるNodeです。

def normal_chat(state: State):
    """ユーザーの入力に対してデータベースを参照せずに回答する"""
    query = state["messages"][-1].content
    prompt = \
    f"""ユーザーの入力に回答してください。
    入力: {query}
    回答: """
    output = llm.invoke(prompt)
    return Command(
        goto=END, 
        update={
            "messages": [
                {
                    "role":"assistant",
                    "content":output.content,
                    "id":output.id
                }
            ]
        }
    )

create_sql

ここがこのアプリの一番のキモになる部分です。ユーザーの質問に対してそれに回答するために必要な情報を得るためのSQLクエリをLLMに生成させるNodeです。

まず登録済みのTableとカラムのCOMMENTを取得します。そしてそれらをLLMへのプロンプトに組み込み、参照するTableの情報を与え、SQLクエリを生成させます。

def create_sql(state: State):
    """ユーザーのクエリに回答するために必要な情報を検索するためのSQLを生成する"""

    # TableのCOMMENT取得
    table_info_str = spark.sql(
    f"""
    SELECT COMMENT FROM information_schema.tables
    WHERE table_catalog = '{CATALOG_NAME}'
    AND table_schema = '{SCHEMA_NAME}'
    AND table_name = '{TABLE_NAME}';
    """
    ).toPandas().to_csv()

    # カラム情報取得
    table_columns_info_str = spark.sql(
    f"""
    DESCRIBE TABLE {CATALOG_NAME}.{SCHEMA_NAME}.`{TABLE_NAME}`
    """
    ).toPandas().to_csv()

    query = state["messages"][-1].content

    sql_prompt = \
    f"""
    あなたは自然言語のユーザーのクエリを達成するために必要なSQLを生成するアシスタントです。

    次の情報を参照し、ユーザーのクエリを達成するためのSQLクエリを作成し、SQLクエリのみ出力してください。
    "`"で囲ったり、"sql"等の接頭詞を含めてはいけません。
    また、テーブル名は"`"を含め、絶対に省略してはいけません。

    # テーブル名
    `{CATALOG_NAME}`.`{SCHEMA_NAME}`.`{TABLE_NAME}`

    # テーブルのコメント
    {table_info_str}

    # テーブルのカラム情報
    {table_columns_info_str}

    クエリ: {query}
    sql:"""

    sql_query = llm.invoke(sql_prompt,temperature=0.0)
    return Command(goto="execute_sql", update ={"sql": sql_query.content})

execute_sql

"create_sql"で生成したクエリを実際に実行するNodeです。ここはLLMは使用せずにクエリを実行して返ってきた結果を次のNodeに渡します。

時々生成されるクエリに不具合があってクエリの実行に失敗してしまうことがあります。今回はクエリ実行エラーを例外でキャッチし、実行結果に"Error!"をセットするという簡単な対応にしていますが、エラーが発生した時は失敗したクエリをLLMにチェックさせ、修正させる、といった処理に遷移させると回答率を高められると思います。

def execute_sql(state: State):
    """SQLを実行する"""
    sql_query = state["sql"]
    try:
        query_result = spark.sql(sql_query).toPandas().to_csv()
    except:
        query_result = "Error!"
    
    if query_result == "Error!":
        return Command(goto=END, update ={"query_result": query_result})
    else:
        return Command(goto="answer", update ={"query_result": query_result})

answer

最後はSQLクエリで取得した結果を参照し、質問に対する回答をLLMに生成させるNodeです。

def answer(state: State):
    """ユーザーの質問にデータベースの結果を使って回答する"""

    query = state["messages"][-1].content
    query_result = state["query_result"]
    analyze_prompt = \
    f"""あなたは音楽に詳しいアシスタントです。
    与えられた情報は、ユーザーの質問に対し答えるために必要な情報をデータベースから抽出したものです。
    この情報を使ってユーザーの質問に答えて下さい。

    # データベースから取得した情報
    {query_result}

    質問: {query}
    回答: """

    output = llm.invoke(analyze_prompt)
    return Command(
        goto=END, 
        update ={"messages": [{"role":"assistant","content":output.content,"id":output.id}]}
    )

ビルド

これらのNodeをGraphに追加して、ビルドしてアプリとして利用出来るようにします。

graph_builder = StateGraph(State)
graph_builder.add_node("rooting", rooting)
graph_builder.add_node("normal_chat", normal_chat)
graph_builder.add_node("create_sql", create_sql)
graph_builder.add_node("execute_sql", execute_sql)
graph_builder.add_node("answer", answer)
graph_builder.set_entry_point("rooting")
app = graph_builder.compile()

動作確認

では、アプリの動作を見てみます。

クエリ: 落ち着いた曲を5曲探して、そのタイトルとアーティスト名を教えて。

以下の様に実行します。

input = {"messages": [("user", query)]}

app.invoke(input=input)

結果は次のようになりました。

落ち着いた曲を5曲探した結果、以下の曲が見つかりました。
Pure White Noise - Loopable with No Fade - Sleep Baby Sleep;Relaxation;White Noise Meditation;Spa Relaxation & Spa;Rest & Relax;Deep Sleep
White Noise - Loopable, No Fade - White Noise Sleep Sounds;Static White Noise Sounds;Real White Noise
Baby White Noise Series - Pure White Noise - Baby Sweet Dream
Pure Brown Noise - Loopable with No Fade - Deep Sleep;Relaxation And Meditation;Relax;Ontspanning Sound;Relajacion Del Mar
Pure White Noise - Baby Sweet Dream
これらの曲は、白色ノイズやブラウンノイズを使用したリラックス音楽で、眠りやすくするために作成されています。

生成されたSQLクエリも確認してみました。

SELECT track_name, artists FROM catalog_name.schema_name.table_name ORDER BY energy ASC LIMIT 5

"落ち着いた"を、"energyが低い"と解釈し、クエリを生成したようです。面白いです。

もう一つ。

クエリ: 平均的にエネルギッシュな曲をリリースしているアーティストは?

結果はこちら。

平均的にエネルギッシュな曲をリリースしているアーティストは、White Noise Research です。

生成されたSQLクエリはこちらです。

SELECT artists FROM catalog_name.schema_name.exp-spotify-tracks-dataset GROUP BY artists ORDER BY AVG(energy) DESC LIMIT 1

平均の計算を行い、さらにその結果を降順で並べる、というクエリを生成しています。良い感じに動作しています!

まとめ

ということで、LangGraphを使ってAzure DatabricksのTableの情報を参照して回答することが出来るLLMアプリを作ってみた話をご紹介しました。今回知れて良かったな、と思ったのが、Model Servingで提供されているFoundation Model API "databricks-meta-llama-3-3-70b-instruct"の回答精度が想像以上に良かった点です。色々な用途で活用出来そうだと感じました。

また、今回元々実現したかったこのアプリをModel Servingで提供する部分についてはもう少し調査が必要そうです。"Feature Serving"という機能を使うことになるのかな、と予想しているのですが、この機能をまだ利用したことがないので、まずはFeature Servingについて調べてみたいなと思います。

それから本日開発ツールのレビューサイトFindy Tools様にLangGraphについてのレビュー記事を寄稿いたしました!

findy-tools.io

LangGraphにご興味持たれたら、ぜひこちらの記事もご覧ください!