728x90

How to route between sub-chains

라우팅을 사용하면 이전 단계의 결과가 다음 단계를 정의하는 "non-deterministic(비결정적) chain"을 구현 할 수 있습니다.
라우팅은 state(상태)를 정의하고 상태에 대한 정보를 모델 호출 시에 context로 사용할 수 있여 모델과의 상호작용에 대한 구조와 일관성을 제공할 수 있습니다.

라우팅을 사용하는 두가지 방법이 있는데

  1. 조건부로 RunnableLambda에서 runnable을 반환
  2. Runnable Branch 사용(과거 방식)

본 실습에서는 두가지 방법을 모두 사용합니다.

두 단계로 구성되는데 첫 단계에서는 입력된 질문을 Langchain, Anthropic 혹은 Other로 분류하고,
두번째 단계에서는 상응하는 Prompt Chain으로 라우팅 하는 것입니다.

실습

우선 입력된 질문을 Langchain, Anthropic, Other로 분류하는 chain을 생성합니다.

from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate

chain = (
    PromptTemplate.from_template(
        """Given the user question below, classify it as either being about `LangChain`, `Anthropic`, or `Other`.

Do not respond with more than one word.

<question>
{question}
</question>

Classification:"""
    )
    | ChatOpenAI(model_name = 'gpt-3.5-turbo')
    | StrOutputParser()
)

chain.invoke({"question": "how do I call LangChain?"})

결과

langchain

chain의 역할은 Prompt를 통해서 사용자의 질문이 어떤 것에 관련된 질문인지 파악하는 것 입니다

Create SubChain

이제 3가지 subchain을 생성할 차례입니다.

langchain_chain = PromptTemplate.from_template(
    """You are an expert in langchain. \
Always answer questions starting with "As Harrison Chase told me". \
Respond to the following question:

Question: {question}
Answer:
"""
) | ChatOpenAI(model="gpt-3.5-turbo")

anthropic_chain = PromptTemplate.from_template(
    """You are an expert in anthropic. \
Always answer questions starting with "As Dario Amodei told me". \
Respond to the following question:

Question: {question}
Answer:"""
) | ChatOpenAI(model="gpt-3.5-turbo")

general_chain = PromptTemplate.from_template(
    """Respond to the following question:

Question: {question}
Answer:"""
) | ChatOpenAI(model="gpt-3.5-turbo")

각각의 체인 별로 prompt에 대해서 변화를 주었습니다.

1. Custom Function 사용(추천)

서로 다른 output에 대해서 사용자설정 함수를 라우팅할 수 있다.

def route(info):
    if "anthropic" in info['topic'].lower():
        return anthropic_chain
    elif "langchain" in info['topic'].lower():
        return langchain_chain
    else:
        return general_chain

route 함수의 역할은 info 딕셔너리에서 topic key로 값을 찾고 소문자로 만들었을때

그 정보가 anthropic 이면 anthropic_chain을 반환하고 langchain이면 langchain_chain을 반환합니다.

from langchain_core.runnables import RunnableLambda

full_chain = {'topic' : chain, 'question' : lambda x : x['question']} | RunnableLambda(route)

chain의 결과로 question이 들어오면 해당 question을 chain의 input으로 넣고 그 output을 "topic"의 value 값으로 설정한다.

이후 {'topic' : chain의 결과, 'question' : 사용자의 질문} 을 route 함수의 매개변수로 전달해서 route 함수를 실행한다.

full_chain.invoke({"question": "how do I use Anthropic?"})

결과

AIMessage(content='As Dario Amodei told me, you can use Anthropic by applying the principles of anthropic reasoning to understand the role of observers in the universe and how our existence shapes the laws of physics. This can help in exploring questions related to the nature of consciousness, the fine-tuning of the universe, and the implications of our presence in the cosmos.', response_metadata={'token_usage': {'completion_tokens': 72, 'prompt_tokens': 47, 'total_tokens': 119}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-912c72d0-ab79-4943-a13c-4fb1c533370f-0', usage_metadata={'input_tokens': 47, 'output_tokens': 72, 'total_tokens': 119})
full_chain.invoke({"question": "how do I use LangChain?"})

결과

AIMessage(content='As Dario Amodei told me, you can use Anthropic by applying the principles of anthropic reasoning to understand the role of observers in the universe and how our existence shapes the laws of physics. This can help in exploring questions related to the nature of consciousness, the fine-tuning of the universe, and the implications of our presence in the cosmos.', response_metadata={'token_usage': {'completion_tokens': 72, 'prompt_tokens': 47, 'total_tokens': 119}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-912c72d0-ab79-4943-a13c-4fb1c533370f-0', usage_metadata={'input_tokens': 47, 'output_tokens': 72, 'total_tokens': 119})
full_chain.invoke({"question": "how do I use LangChain?"})

결과

AIMessage(content='4', response_metadata={'token_usage': {'completion_tokens': 1, 'prompt_tokens': 24, 'total_tokens': 25}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-f1c1ae37-a7cc-4a17-bb0a-68e4d09d4687-0', usage_metadata={'input_tokens': 24, 'output_tokens': 1, 'total_tokens': 25})

2. Using a RunnableBranch

Runnable Branch는 input에 따라 조건과 runnable을 정의할 수 있는 특별한 종류의 runnable

이는 위에서 설명한 custom function으로 가능한 일들이라 custom function을 사용하는 것을 권장

RunnableBranch는 (condition, runnable) 쌍의 목록과 기본 실행 가능한 작업으로 초기화됨.

입력과 함께 각 조건을 전달하여 어떤 branch를 선택할지 결정

첫 번째로 True를 반환하는 조건을 선택하여 해당 조건과 연결된 runnable을 입력과 함께 실행

제공된 조건 중 어떤 것도 일치하지 않으면 기본 runnable을 실행

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    ( lambda x : "anthropic" in x["topic"].lower(), anthropic_chain),
    ( lambda x : "langchain" in x["topic"].lower(), langchain_chain),
    general_chain,
)
full_chain = {"topic": chain, "question": lambda x : x['question']} | branch
full_chain.invoke({"question": "how do I use Anthropic?"})

결과

AIMessage(content='As Dario Amodei told me, you can use Anthropic by applying its principles and theories to study the relationship between humans and their environments, as well as the impact of human activities on the world around us. This can involve research, analysis, and critical thinking to better understand our place in the world and how we can work towards a more sustainable and harmonious existence.', response_metadata={'token_usage': {'completion_tokens': 76, 'prompt_tokens': 47, 'total_tokens': 123}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-9c9a4c74-9b79-4cb7-824b-6d3c830fd69a-0', usage_metadata={'input_tokens': 47, 'output_tokens': 76, 'total_tokens': 123})
  1. full_chain에서는 chain으로 topic의 값을 생성한 후 branch에 "topic"과 "question을 key로 하는 딕셔너리 객체를 branch에 전달
  2. branch에서는 topic을 키로 검색해서 해당하는 chain을 실행한다.
full_chain.invoke({"question": "how do I use LangChain?"})

결과

AIMessage(content='As Harrison Chase told me, to use LangChain, you need to first create an account and log in. Then, you can input text or upload documents to be analyzed for linguistic patterns and insights. You can also explore the various features and tools available on the platform to enhance your language analysis capabilities.', response_metadata={'token_usage': {'completion_tokens': 60, 'prompt_tokens': 44, 'total_tokens': 104}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-335488fb-1ead-4e2c-811b-aea768653bb2-0', usage_metadata={'input_tokens': 44, 'output_tokens': 60, 'total_tokens': 104})
full_chain.invoke({"question": "whats 2 + 2"})

결과

AIMessage(content='4', response_metadata={'token_usage': {'completion_tokens': 1, 'prompt_tokens': 24, 'total_tokens': 25}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-3f4c3f6e-92d2-45d9-8b49-6359bcb9e26b-0', usage_metadata={'input_tokens': 24, 'output_tokens': 1, 'total_tokens': 25})

3. Routing by Semantic Similarity

Embedding을 사용해서 사용자의 query를 가장 관련성 높은 prompt에 라우팅할 수 있다.

from langchain_community.utils.math import cosine_similarity
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

physics_template = """You are a very smart physics professor. \
You are great at answering questions about physics in a concise and easy to understand manner. \
When you don't know the answer to a question you admit that you don't know.

Here is a question:
{query}"""

math_template = """You are a very good mathematician. You are great at answering math questions. \
You are so good because you are able to break down hard problems into their component parts, \
answer the component parts, and then put them together to answer the broader question.

Here is a question:
{query}"""

embeddings = OpenAIEmbeddings()
prompt_templates = [physics_template, math_template]
prompt_embeddings = embeddings.embed_documents(prompt_templates)

우선 physics_templatemath_template을 작성하고 Embedding Model 객체를 생성한다.

prompt_templates라는 리스트를 만들고 prompt template을 저장합니다.

이후 Embedding Model로 Prompt Template 들의 임베딩을 생성합니다

def prompt_router(input):
    query_embedding = embeddings.embed_query(input["query"])
    similarity = cosine_similarity([query_embedding],prompt_embeddings)[0]
    most_similar = prompt_templates[similarity.argmax()]
    print("Using MATH" if most_similar == math_template else "Using PHYSICS")
    return PromptTemplate.from_template(most_similar)

chain = (
    {"query" : RunnablePassthrough()}
    | RunnableLambda(prompt_router)
    | ChatOpenAI(model = "gpt-3.5-turbo")
    | StrOutputParser()
)
  1. 우선 prompt_router 함수는 사용자의 질문이 담긴 딕셔너리 객체를 입력으로 받고 해당 질문을 임베딩한다.
  2. 질문 임베딩과 template 임베딩을 비교해서 가장 비슷한 template을 PromptTemplate 객체로 만들어서 반환한다.
  3. 이후 RunnableLambdaprompt_router 함수를 넣어서 chain을 구성한다.

따라서, query를 넣어서 invoke하면 질문과 관련이 높은 Prompt Template을 사용하여 자동으로 답변을 생성한다.

print(chain.invoke("What's a black hole"))

결과

Using PHYSICS
A black hole is a region in space where the gravitational pull is so strong that nothing, not even light, can escape from it. It is formed when a massive star collapses in on itself and the remaining mass is compressed into a very small space. The center of a black hole is called a singularity, where the laws of physics as we know them break down. Black holes can vary in size, with supermassive black holes found at the centers of galaxies and smaller stellar black holes formed from the remnants of massive stars.
print(chain.invoke("What's a path integral"))

결과

Using MATH
A path integral is a concept in mathematics and physics, specifically in the field of quantum mechanics. It involves integrating along all possible paths that a particle could take from one point to another, taking into account the probability amplitudes associated with each possible path. This approach allows for the calculation of quantum mechanical quantities such as the probability of a particle transitioning from one state to another. The mathematics behind path integrals can be quite complex, but they provide a powerful tool for understanding the behavior of particles at the quantum level.

 

'인공지능 > RAG' 카테고리의 다른 글

LangChain - Dynamic Chain  (1) 2024.06.04
LangChain - Runnable  (0) 2024.06.04
LangChain - RunnablePassTrough  (0) 2024.06.04
LangChain (11) 오픈소스 LLM으로 RAG 구현  (0) 2024.06.04
LangChain (10) Gemini로 RAG 구현  (0) 2024.06.04
728x90

How to create a dynamic (self-constructing) chain

  • chain의 일부를 입력값에 따라 runtime에 할당하고 싶을때 사용하는 기법이다.
  • RunnableLambda의 속성을 활용해서 Dynamic chain을 구성 할 수 있다.

실습

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model = "gpt-3.5-turbo")
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnablePassthrough, chain

contextualize_instructions = """
Convert the latest user question into a standalone question given the chat history. 
Don't answer the question, return the question and nothing else (no descriptive text).
"""

contextualize_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", contextualize_instructions),
        ("placeholder", "{chat_history}"),
        ("human","{question}"),
    ]
)

contextualize_question = contextualize_prompt | llm | StrOutputParser()

위에서 contextualize_instructions를 통해서 채팅 기록을 기반으로 새로운 질문을 생성하는 chain을 구성한다.

qa_instructions = (
    """Answer the user question given the following context:\n\n{context}."""
)

qa_prompt = ChatPromptTemplate.from_messages(
    [('system',qa_instructions),('human','{question}')]    
)

Chat Prompt Template을 구성한다.

@chain
def contextualize_if_needed(input_ : dict) -> Runnable:
    if input_.get("chat_history"):
        #이 과정을 통해서 실제 출력값을 생성하는건 아니고 또 다른 Runnable을 반환한다.
        return contextualize_question
    else:
        return RunnablePassthrough()

@chain
def fake_retriever(input_ : dict) -> str:
    return "egypt's population in 2024 is about 111 million"

full_chain = (
    RunnablePassthrough.assign(qusetion = contextualize_if_needed).assign(
        context=fake_retriever
    )
    | qa_prompt
    | llm
    | StrOutputParser()
)
  • 만약에 RunnablePassThrough를 통해서 입력된 dict() 객체에 "chat_history"라는 key가 있다면 contextualize_question이라는 Runnable을 반환한다.
    • 반환된 Runnable은 full chain이 실행되면 스스로 실행된다.
  • 만약에 없다면 RunnablePassThrough를 반환한다.

여기서 Runnable이란 invoked, batched, streamed, transformed and composed 될 수 있는 일의 단위이다.

  • invoke/ainvoke: 하나의 input을 output으로 변경시켜주는 메서드
  • batch/abatch: 여러 개의 input을 output으로 변경시켜주는 메서드
  • stream/astream: 결과물을 stream 해준다.
  • astream_log: 출력 및 입력에서 선택된 중간 결과를 Stream 해준다.

Runnable을 구성하는 주요 primitive(구성 요소)는 RunnableSequence and RunnableParallel이다.

  • RunnableSequence는 한 Runnable의 결과물을 다음 과정의 input으로 사용함으로써 Runnable들을 순차적으로 invoke한다.
    • | 연산자를 사용하거나 Runnable의 list를 통해서 구성한다.
  • RunnableParallel은 동일한 input을 여러 Runnable들에 전달함으로써 Runnable들을 동시에 invoke한다.
    • dict 타입을 활용해서 구성한다.

출처 : 링크

full_chain.invoke(
    {
        "question": "what about egypt",
        "chat_history": [
            ("human", "what's the population of indonesia"),
            ("ai", "about 276 million"),
        ],
    }
)

결과

"Egypt's population in 2024 is about 111 million."
for chunk in contextualize_if_needed.stream(
    {
        "question" : "what about egypt",
        "chat_history": [
            ("human", "what's the population of indonesia"),
            ("ai", "about 276 million"),
        ],
    }
):
    print(chunk)

결과

What
 is
 the
 population
 of
 Egypt
?

'인공지능 > RAG' 카테고리의 다른 글

LangChain - Routing Chain  (1) 2024.06.04
LangChain - Runnable  (0) 2024.06.04
LangChain - RunnablePassTrough  (0) 2024.06.04
LangChain (11) 오픈소스 LLM으로 RAG 구현  (0) 2024.06.04
LangChain (10) Gemini로 RAG 구현  (0) 2024.06.04
728x90

Runnable

Runnable이란 invoked, batched, streamed, transformed and composed 될 수 있는 일의 단위이다.

  • invoke/ainvoke: 하나의 input을 output으로 변경시켜주는 메서드
  • batch/abatch: 여러 개의 input을 output으로 변경시켜주는 메서드
  • stream/astream: 결과물을 stream 해준다.
  • astream_log: 출력 및 입력에서 선택된 중간 결과를 Stream 해준다.

Runnable을 구성하는 주요 primitive(구성 요소)는 RunnableSequence and RunnableParallel이다.

  • RunnableSequence는 한 Runnable의 결과물을 다음 과정의 input으로 사용함으로써 Runnable들을 순차적으로 invoke한다.
    • | 연산자를 사용하거나 Runnable의 list를 통해서 구성한다.
  • RunnableParallel은 동일한 input을 여러 Runnable들에 전달함으로써 Runnable들을 동시에 invoke한다.
    • dict 타입을 활용해서 구성한다.

실습

1. Runnable Sequence

from langchain_core.runnables import RunnableLambda

# | 연산자를 통해 RunnableSequence를 구성하였다
sequence = RunnableLambda(lambda x : x + 1) | RunnableLambda(lambda x : x * 2)
sequence.invoke(1)

1이 들어가서 RunnableLambda에 의해 1+1 연산이 먼저 이루어지고

그 결과인 2가 두번째 RunnableLambda의 입력값으로 들어가서 2 * 2 연산이 이루어진다.

최종 결과는 4

sequence.batch([1,2,3])

앞선 과정이 병렬적으로 동시에 일어난다.

2. Runnable Parallel

# A sequence that contains a RunnableParallel constructed using a dict literal
sequence = RunnableLambda(lambda x: x + 1) | {
    'mul_2': RunnableLambda(lambda x: x * 2),
    'mul_5': RunnableLambda(lambda x: x * 5)
}
sequence.invoke(1)

위 코드는 우선 첫번째 RunnableLambda가 수행된 이후 2가지 RunnableLambda가 병렬적으로 수행된다.

이런 특성을 활용하면 서로 다른 2가지 Retriever를 동시에 적용하여 문서를 검색할 수 있다.

가령 lexical search(키워드 기반)와 semantic search(유사도 기반)를 따로 수행하고 검색한 문서를 기반으로 ReRanking을 수행해서 LLM의 최종 답변을 얻을 수 있다.

3. Runnable Method 사용하기

모든 Runnable은 동작을 수정(실패시 재시도, configurable하게 만들기)하는 데 사용할 수 있는 추가 메서드가 있다.

from langchain_core.runnables import RunnableLambda
import random

def add_one(x: int) -> int:
    return x + 1

def buggy_double(y: int) -> int:
    # 70%의 확률로 fail하는 buggy code
    if random.random() > 0.3 :
        print("This code failed, will probably be retried")
        raise ValueError("Triggerd Buggy Code!")
    return y * 2

sequence = (
    RunnableLambda(add_one) |
    RunnableLambda(buggy_double).with_retry(    # 실패시 재시도하는 메서드
        stop_after_attempt=10,
        wait_exponential_jitter=False
    )
)

print(sequence.input_schema.schema())
print(sequence.output_schema.schema())
print(sequence.invoke(2))

위 코드를 살펴보면 add_one함수는 1을 더해주고 buggy_double 함수는 random 값을 구해서 0.3 이상인 경우
ValueError를 raise하는 코드이다.

RunnableSequence 객체는 input에 대해 add_one 함수를 적용시키고 그 결과를 다시 input으로

buggy_double 함수를 실행하는데 with_retry 메서드를 실행하는데 이 함수는 stop_after_attempt

통해서 몇 번 retry할지 설정할 수 있고 재시도 대기 시간에 wait_exponential_jitter를 통해 jitter를 사용 할 지 설정 할 수 있다.

 

Attributes

InputType The type of input this runnable accepts specified as a type annotation.
OutputType The type of output this runnable produces specified as a type annotation.
config_specs List configurable fields for this runnable.
input_schema The type of input this runnable accepts specified as a pydantic model.
name The name of the runnable.
output_schema The type of output this runnable produces specified as a pydantic model.

Methods

__init__()
abatch(inputs[, config, return_exceptions]) Default implementation runs ainvoke in parallel using asyncio.gather.
abatch_as_completed() Run ainvoke in parallel on a list of inputs, yielding results as they complete.
ainvoke(input[, config]) Default implementation of ainvoke, calls invoke from a thread.
assign(**kwargs) Assigns new fields to the dict output of this runnable.
astream(input[, config]) Default implementation of astream, which calls ainvoke.
astream_events(input[, config, ...]) [Beta] Generate a stream of events.
astream_log() Stream all output from a runnable, as reported to the callback system.
atransform(input[, config]) Default implementation of atransform, which buffers input and calls astream.
batch(inputs[, config, return_exceptions]) Default implementation runs invoke in parallel using a thread pool executor.
batch_as_completed() Run invoke in parallel on a list of inputs, yielding results as they complete.
bind(**kwargs) Bind arguments to a Runnable, returning a new Runnable.
config_schema(*[, include]) The type of config this runnable accepts specified as a pydantic model.
get_graph([config]) Return a graph representation of this runnable.
get_input_schema([config]) Get a pydantic model that can be used to validate input to the runnable.
get_name([suffix, name]) Get the name of the runnable.
get_output_schema([config]) Get a pydantic model that can be used to validate output to the runnable.
get_prompts([config])
invoke(input[, config]) Transform a single input into an output.
map() Return a new Runnable that maps a list of inputs to a list of outputs, by calling invoke() with each input.
pick(keys) Pick keys from the dict output of this runnable.
pipe(*others[, name]) Compose this Runnable with Runnable-like objects to make a RunnableSequence.
stream(input[, config]) Default implementation of stream, which calls invoke.
transform(input[, config]) Default implementation of transform, which buffers input and then calls stream.
with_config([config]) Bind config to a Runnable, returning a new Runnable.
with_fallbacks(fallbacks, *[, ...]) Add fallbacks to a runnable, returning a new Runnable.
with_listeners(*[, on_start, on_end, on_error]) Bind lifecycle listeners to a Runnable, returning a new Runnable.
with_retry(*[, retry_if_exception_type, ...]) Create a new Runnable that retries the original runnable on exceptions.
with_types(*[, input_type, output_type]) Bind input and output types to a Runnable, returning a new Runnable.

 

 

출처 : 링크

'인공지능 > RAG' 카테고리의 다른 글

LangChain - Routing Chain  (1) 2024.06.04
LangChain - Dynamic Chain  (1) 2024.06.04
LangChain - RunnablePassTrough  (0) 2024.06.04
LangChain (11) 오픈소스 LLM으로 RAG 구현  (0) 2024.06.04
LangChain (10) Gemini로 RAG 구현  (0) 2024.06.04
728x90

RunnablePassThrough

chain을 여러 단계로 구성할때 전 단계의 출력값을 바로 다음 단계의 입력값으로 사용해야하는 경우가 있는데

이런 경우에 RunnablePassThrough를 사용하면 됨. 일반적으로 RunnableParallel이랑 같이 사용됨

1. API key 설정

%pip install -qU langchain langchain-openai

import os
from getpass import getpass

os.environ["OPENAI_API_KEY"] = getpass()

2. RunnablePassTrough 구조 살피기

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

runnable = RunnableParallel(
    passed = RunnablePassthrough(),
    modified = lambda x : x["num"] + 1,
)

runnable.invoke({"num": 1})
  • 위 코드에서는 RunnablePassThrough()를 통해서 값을 전달받고 passed 변수에 저장함.
  • 이후 modified에 수정된 값을 저장함.
  • modified에는 람다식으로 앞서 passed 된 값에 1을 더해줌.
    • lambda x : 여기서 x는 입력값
    • x["num"] + 1 : x는 딕셔너리 값이므로 num이라는 key를 가진 value에 1을 더해줌

3. Retrieval Example

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
model = ChatOpenAI()

retrieval_chain = (
    {"context" : retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

retrieval_chain.invoke("where did harrison work?")
  • Retrival_chain은 {"context", "question"}, prompt, model, StrOutputParser()로 구성된다.
  • "context"는 vector store에서 검색을 해서 가져오는 문자열
  • "question"은 RunnablePassThrough 즉,invoke() 안의 문자열이다.
  • 이후 "context"와 "question"이 prompt에 formatting 된다.
  • 이후 이 prompt로 model에 질의를 하고
  • StrOutputParser()를 통해서 결과를 문자열로 출력한다.

StrOutputParser()가 없는 경우 아래와 같다.

retrieval_chain = (
    {"context" : retriever, "question": RunnablePassthrough()}
    | prompt
    | model
)
retrieval_chain.invoke("where did harrison work?")

'인공지능 > RAG' 카테고리의 다른 글

LangChain - Dynamic Chain  (1) 2024.06.04
LangChain - Runnable  (0) 2024.06.04
LangChain (11) 오픈소스 LLM으로 RAG 구현  (0) 2024.06.04
LangChain (10) Gemini로 RAG 구현  (0) 2024.06.04
What is Elastic Search?  (0) 2024.06.03
728x90

오픈소스 LLM의 필요성

  • 비용 : API 호출시마다 비용 발생
  • 보안 : 기업 정보 유출 우려
  • 안정성 : API를 관리하는 회사에서 문제가 생기면 서비스 제공이 불가

한국의 오픈소스 LLM

업스테이지와 NIA가 주관하여 한국어 오픈소스 LLM 리더보드 운영중. 이 리더보드를 통해서 최신 오픈소스 LLM들의 성능 비교하고 사용 가능

오픈 소스 LLM의 양자화(Quantization)

컴퓨팅 리소스 부담을 줄이기 위해 양자화를 통해 경량화한 모델을 많이 활용

https://medium.com/@florian_algo/model-quantization-1-basic-concepts-860547ec6aa9

모델 양자화를 통해 가중치 정확도(소수점)을 줄여서 계산량과 저장해야하는 정보를 줄인다.

실습

1단계 - LLM 양자화에 필요한 패키지 설치

  • bitsandbytes: Bitsandbytes는 CUDA 사용자 정의 함수, 특히 8비트 최적화 프로그램, 행렬 곱셈(LLM.int8()) 및 양자화 함수에 대한 경량 래퍼
  • PEFT(Parameter-Efficient Fine-Tuning): 모델의 모든 매개변수를 미세 조정하지 않고도 사전 훈련된 PLM(언어 모델)을 다양한 다운스트림 애플리케이션에 효율적으로 적용 가능
  • accelerate: PyTorch 모델을 더 쉽게 여러 컴퓨터나 GPU에서 사용할 수 있게 해주는 도구
#양자화에 필요한 패키지 설치
!pip install -q -U bitsandbytes
!pip install -q -U git+https://github.com/huggingface/transformers.git
!pip install -q -U git+https://github.com/huggingface/peft.git
!pip install -q -U git+https://github.com/huggingface/accelerate.git

2단계 - 트랜스포머에서 BitsandBytesConfig를 통해 양자화 매개변수 정의하기

  • load_in_4bit=True: 모델을 4비트 정밀도로 변환하고 로드하도록 지정
  • bnb_4bit_use_double_quant=True: 메모리 효율을 높이기 위해 중첩 양자화를 사용하여 추론 및 학습
  • bnd_4bit_quant_type="nf4": 4비트 통합에는 2가지 양자화 유형인 FP4와 NF4가 제공됨. NF4 dtype은 Normal Float 4를 나타내며 QLoRA 백서에 소개되어 있습니다. 기본적으로 FP4 양자화 사용
  • bnb_4bit_compute_dype=torch.bfloat16: 계산 중 사용할 dtype을 변경하는 데 사용되는 계산 dtype. 기본적으로 계산 dtype은 float32로 설정되어 있지만 계산 속도를 높이기 위해 bf16으로 설정 가능
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

3단계 - 경량화 모델 로드하기

이제 모델 ID를 지정한 다음 이전에 정의한 양자화 구성으로 로드합니다.

model_id = "kyujinpy/Ko-PlatYi-6B"

tokenizer=AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id,quantization_config=bnb_config,device_map="auto")
  • AutoTokenizer.from_pretrained : model_id 에서 사용된 토크나이저를 자동으로 가져온다.
  • AutoModelForCausalLM.from_pretrained : model_id 에서 사용된 언어모델을 자동으로 가져온다.
    • quantization_config : 경량화 기법 어떻게 할건지 정의하는 부분. 우리는 위에서 정의한 BitsAndBytes config를 전달한다.
    • device_map : auto로해서 어떤 gpu에서 돌아갈지 자동으로 설정해준다.
print(model)
  • LLama 기반 모델
  • (embed_tokens): Embedding(78464, 4096, padding_idx=0) 부분을 살펴보면 78464 토큰이 들어오면 4096차원으로 임베딩 시킨다는 뜻이다.
  • 모델을 print를 주는 이유는 양자화 잘 되었는지 확인하기 위해서이다.
    • Attention 부분에 Linear4bit으로 돼있으면 4bit로 양자화가 잘 된거다.

4단계 - 잘 실행되는지 확인

device = "cuda:0"

messages = [
    {"role":"user","content":"은행의 기준 금리에 대해서 설명해줘"}
]

encodes = tokenizer.apply_chat_template(messages, return_tensors="pt")

model_inputs = encodes.to(device)

generated_ids = model.generate(model_inputs,max_new_tokens=1000,do_sample=True)
decoded = tokenizer.batch_decode(generated_ids)
print(decoded[0])
  • device를 cuda 첫번째 걸로 정의(gpu로 설정한다고 생각하자)
  • tokenizer의 apply_chat_template을 통해 messages를 토크나이저에 넣고 인코딩한다.
  • return_tensors = 'pt'로 설정해서 pytorch tensor 형식으로 메시지를 변환해준다.
  • 인코딩한 값을 device(gpu)에 넣는다.
  • 모델의 generate 함수를 통해서 model_input을 넣어서 결과를 출력받는다
    • 최대 토큰 설정
    • sampling은 뭐지?
  • 토크나이저의 batch_decode를 통해서 LLM이 생성한 임베딩을 자연어로 변환해준다.

RAG 시스템 구현

!pip -q install langchain pypdf chromadb sentence-transformers faiss-cpu
from langchain.llms import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from transformers import pipeline
from langchain.chains import LLMChain

text_generation_pipeline = pipeline(
    model = model,
    tokenizer=tokenizer,
    task="text-generation",
    temperature=0.2,
    return_full_text = True,
    max_new_tokens=300,
)

prompt_template = """
### [INST]
Instruction: Answer the question based on your bank economy knowledge.
Here is context to help:

{context}

### QUESTION:
{question}

[/INST]
 """

koplatyi_llm = HuggingFacePipeline(pipeline=text_generation_pipeline)

#Create Prompt from Prompt Template
prompt = PromptTemplate(
    input_variables=["context","question"],
    template=prompt_template,
)

#create llm chain
llm_chain = LLMChain(llm=koplatyi_llm,prompt=prompt)
  • HuggingFacePipelineLLMChain은 API를 통해서 답변을 얻으면 필요 없는 라이브러리
  • 모델에 질문을 하고 답변을 받는 일련의 과정을 pipeline을 통해서 수행
  • HuggingFacePipeline을 사용하는 이유는 langchain과 결합할때 허깅페이스 파이프라인과 결합하는게 더 편해서이다.
  • LLMChain을 통해서 어떤 prompt template으로 어떤 llm을 돌리냐만 정의해주면 chain이 완성된다.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.document_loaders import PyPDFLoader
from langchain.schema.runnable import RunnablePassthrough
loader = PyPDFLoader("/content/drive/MyDrive/코딩/LangChain 실습(모두의AI)/data/[이슈리포트 2022-2호] 혁신성장 정책금융 동향.pdf")
pages = loader.load_and_split()

text_splitter = RecursiveCharacterTextSplitter(chunk_size = 500, chunk_overlap=50)
texts = text_splitter.split_documents(pages)

from langchain.embeddings import HuggingFaceEmbeddings

model_name = "jhgan/ko-sbert-nli"
encode_kwargs={"normalize_embeddings":True}
hf = HuggingFaceEmbeddings(
    model_name = model_name,
    encode_kwargs=encode_kwargs
)

db = FAISS.from_documents(texts,hf)
retriever = db.as_retriever(
    search_type = "similarity",
    search_kwargs={'k':3}
)
rag_chain = (
    {"context":retriever,"question":RunnablePassthrough()}
    | llm_chain
)

rag_chain : rag_chain에 들어오는 값을 question으로 삼아서 해당 질문을 토대로 retriever를 통해서 context를 넣을 내용을 가져온다.

그리고 llm_chain으로 들어오는 질문과 context를 넣어서 답변을 생성한다.

import warnings
warnings.filterwarnings('ignore')
result = rag_chain.invoke("혁신성장 정책 금융에서 인공지능이 중요한가?")

for i in result['context']:
    print(f"주어진 근거: {i.page_content} / 출처: {i.metadata['source']} - {i.metadata['page']} \n\n")

print(f"\n답변:{result['text']}")

Reference

모두의 AI[https://youtu.be/04jCXo5kzZE?si=W6eZI0TYRcc28REw]

'인공지능 > RAG' 카테고리의 다른 글

LangChain - Runnable  (0) 2024.06.04
LangChain - RunnablePassTrough  (0) 2024.06.04
LangChain (10) Gemini로 RAG 구현  (0) 2024.06.04
What is Elastic Search?  (0) 2024.06.03
Langchain - Ensemble Retriever  (1) 2024.05.31
728x90

실습 내용 자체는 앞서 수행한 과정과 거의 동일하다.
LLM이 Gemini로 바뀐것 뿐이다.

실습

from IPython.display import Markdown

import os
os.environ["GOOGLE_API_KEY"] = "YOUR_API_KEY"
%pip install -U --quiet langchain-google-genai
from langchain_google_genai import ChatGoogleGenerativeAI

LLM 정의 및 체크

llm = ChatGoogleGenerativeAI(model = "gemini-pro")
result = llm.invoke("네이버에 대해 보고서를 작성해줘")
Markdown(result.content)

Dependency 설치

%pip install -U --quiet langchain tiktoken pypdf sentence_transformers chromadb
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.document_loaders import PyPDFLoader

PDF Loader & Text Splitter & Embedding

loader = PyPDFLoader("/content/drive/MyDrive/코딩/LangChain 실습(모두의AI)/data/[이슈리포트 2022-2호] 혁신성장 정책금융 동향.pdf")
pages = loader.load_and_split()

text_splitter = RecursiveCharacterTextSplitter(chunk_size = 500, chunk_overlap = 50)
texts = text_splitter.split_documents(pages)

from langchain.embeddings import HuggingFaceEmbeddings

model_name = "jhgan/ko-sbert-nli"
model_kwargs={'device':'cpu'}
encode_kwargs={'normalize_embeddings': True}
hf = HuggingFaceEmbeddings(
    model_name = model_name,
    model_kwargs = model_kwargs,
    encode_kwargs = encode_kwargs
)

docsearch = Chroma.from_documents(texts,hf)

Retriever

retriever = docsearch.as_retriever(
    search_type = 'mmr',
    search_kwargs={'k':3,'fetch_k':10}
)
retriever.get_relevant_documents("혁신성장 정책 금융에 대해서 설명해줘")

Prompt Template

from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnableMap

template = """Answer the question as based only on the following context:
{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

chain 구성

gemini = ChatGoogleGenerativeAI(model = "gemini-pro", temperature=0)

chain = RunnableMap({
    "context":lambda x: retriever.get_relevant_documents(x['question']),
    "question":lambda x: x['question']
}) | prompt | gemini

RunnableMap으로 관련된 문서를 context로, 질문은 question으로 Prompt Template에 전달하고 이를 기반으로
LLM이 답변을 생성한다.

결과 출력

Markdown(chain.invoke({'question':"혁신성장 정책금융에 대해서 설명해줘"}).content)

LLM의 답변을 markdown 형식으로 출력해준다.

Reference

모두의 AI[https://www.youtube.com/@AI-km1yn]

'인공지능 > RAG' 카테고리의 다른 글

LangChain - RunnablePassTrough  (0) 2024.06.04
LangChain (11) 오픈소스 LLM으로 RAG 구현  (0) 2024.06.04
What is Elastic Search?  (0) 2024.06.03
Langchain - Ensemble Retriever  (1) 2024.05.31
Langchain - Hybrid Search 구현  (0) 2024.05.31
728x90

Elastic Search는 검색 엔진.

관계형 DB(RDBMS)와의 관계

관계형 데이터베이스에 대응되는 Elastic Search의 개념들

데이터 접근 방식

Elastic Search는 Rest API를 통한 데이터 조작이 가능하다.

Inverted Index

Elastic Search의 가장 큰 특징이라고 하면 inverted index이다.
일반적인 index는 문서를 빠르게 찾기 위해서 문서의 위치를 가르키지만

inverted index는 단어들에 대해서 어떤 단어가 어느 문서에 포함되어있는지를 기록합니다.

가령, 아래와 같은 데이터베이스가 있다고 합시다.

ID Name
1 Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks
2 Retrieval-Augmented Generation for Large Language Models: A Survey
3 A Survey on Retrieval-Augmented Text Generation
4 Benchmarking Large Language Models in Retrieval-Augmented Generation
5 Generation-Augmented Retrieval for Open-domain Question Answering
6 MEASURING MASSIVE MULTITASK LANGUAGE UNDERSTANDING

위 테이블에서 Retrieval 이라는 단어가 포함된 행을 가져오려면 ID = 1 부터 테이블의 끝까지 반복적으로 검색을 해야합니다.

따라서, 데이터의 개수가 많아질수록 검색할 대상이 늘어나고 row 안의 내용을 모두 읽어야하므로 검색 속도가 매우 느립니다.

반면에 아래와 같이 inverted index를 적용한 테이블을 구성하는 경우를 생각해 볼 수 있습니다.

Term ID
Retrieval 1,2,3,4,5
Augmented 1,2,3,4,5
Generation 1,2,3,4,5
MEASURING 6
Language 1,2,3,4,5,6
for 1,2,5
Open 5

Elastic Search에서는 위와 같은 inverted index를 구성해서 데이터가 늘어나더라도 큰 속도 저하 없이 빠르게 검색이 가능합니다.

Elastic Search에서는 추출된 각 키워드를 Term이라고 부릅니다.

Elastic Search의 특징

  • Elastic Search는 분산 처리를 통해 빠른 검색이 가능하고 대량의 비정형 데이터 검색이 가능하다.
  • 전문 검색과 구조 검색 모두를 지원한다.
  • 검색 엔진이지만 대용량 스토리지로 활용이 가능하다.

Elastic Search의 장점

  • 오픈소스
  • 전문 검색
    • 내용 전체를 indexing해서 특정 단어가 포함된 단어를 검색할 수 있다.
  • RESTful API
    • RestAPI 형식으로 요청하고 json format을 사용하기 때문에 개발 환경에 독립적으로 사용 가능하다.
  • Schemaless
    • 비정형 데이터에 대해서도 indexing과 검색이 가능
  • Inverted Index
    • 앞서 설명한대로 데이터의 양이 증가해도 검색 속도에 큰 영향이 없다.
  • Shard
    • 분산 구성이 가능해서 확장성이 높다.Elastic Search의 단점
  • Transaction Rollback 지원 X
    • 성능을 위해서 Rollback, Transaction을 지원하지 않는다..(??)
  • 데이터의 업데이트를 제공하지 않는다
    • 업데이터를 하는 경우 기존 문서를 제거하고 새로운 문서를 생성한다.

용어 설명

1. Indexing

  • 데이터를 검색 가능한 구조로 바꾸는 과정
  • 원본 문서를 검색어 토큰으로 변환 후 저장

2. Document

  • 단일 데이터 단위
  • RDBMS의 Row라고 생각하면 된다

3. Index

  • Document를 모아놓은 집합
  • Indexing이 완료된 데이터가 저장되는 공간
  • 문서들의 논리적인 집합 단위

4. Shard

  • index가 분리되는 단위
  • 각 노드에 분산되어 저장
  • 단일 검색 인스턴스
  • 클러스터에 노드를 추가하면 샤드들이 각 노드들로 분산되고 디폴트로 1개의 복제본을 생성
  • 처음 생성된 샤드를 primary shard라 하고, 이후에 생성된 샤드를 replica하 함
  • 노드가 1개인 경우 primary shard만 존재
  • primary shard와 replica는 필연적으로 동일한 데이터를 저장하고 데이터베이스 안전성을 위해 서로 다른 노드에 저장됩니다.

5. Master Node

  • 인덱스의 메타 데이터, 클러스터 상태 정보를 관리하는 노드.
  • 클러스터당 1개의 마스터 노드가 존재
  • 마스터 후보 노드가 존재하며 마스터 노드가 모종의 이유로 다운되면 후보 노드중 하나가 마스터 노드의 역할을 수행

6. Data Node

  • 실제 indexing된 데이터가 저장된 노드

Reference

  1. https://www.elastic.co/guide/en/elasticsearch/reference/current/elasticsearch-intro.html
  2. https://esbook.kimjmin.net/
  3. https://jaemunbro.medium.com/elastic-search-%EA%B8%B0%EC%B4%88-%EC%8A%A4%ED%84%B0%EB%94%94-ff01870094f0
728x90

Ensemble Retriever란?

  • 앙상블 검색기는 Retriever의 list를 받아서 각각의 Retriever로 get_relevant_documents 방법으로 검색한 결과를 종합한 뒤 Reciprocal Rank Fusion 알고리즘으로 결과를 rerank한다.
  • Similarity search 만으로 성능이 잘 나오지 않을 때 사용하기 좋은 방법론이다.
  • 가장 흔히 사용하는 방식은 sparse retriever(BM25 같이 키워드 기반)와 dense retriever(embedding similarity)를 함께 사용하는 것이다.
  • Hybrid Search 라고도 알려져있고 sparse retriever는 keyword 기반으로 문서를 찾는데 특화되어 있고 dense retriever는 의미론적 유사도 기반으로 문서를 찾는데 특화되어 있으므로 두 종류를 합쳐서 사용하면 효과가 좋다.관련 문서 링크
  • Reciprocal Rank Fusion에 대한 자세한 설명은 👉여기
  • Ensemble Retriever 관련 Langchain 문서 👉여기
  • BM25 Retriever 관련 Langchain 문서 👉여기

Dependency 설치

pip install --upgrade --quiet  rank_bm25 > /dev/null
pip install rank_bm25
pip install faiss-cpu
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

실습

Retriever 정의

문자열이 저장된 리스트 2개를 선언한다. 이후 bm25_retriever와 faiss_retriever를 정의한다.
metadatas에 source를 정해주고 가져올 문서의 개수는 k로 설정한다.

이후 ensemble_retriever를 구성하고 weights로 어떤 retriever의 결과를 더 반영할 지 정한다.

doc_list_1 = [
    "I like apples",
    "I like oranges",
    "Apples and oranges are fruits",
]

# initialize the bm25 retriever and faiss retriever
bm25_retriever = BM25Retriever.from_texts(
    doc_list_1, metadatas=[{"source": 1}] * len(doc_list_1)
)
bm25_retriever.k = 2

doc_list_2 = [
    "You like apples",
    "You like oranges",
]

embedding = OpenAIEmbeddings()
faiss_vectorstore = FAISS.from_texts(
    doc_list_2, embedding, metadatas=[{"source": 2}] * len(doc_list_2)
)
faiss_retriever = faiss_vectorstore.as_retriever(search_kwargs={"k": 2})

# initialize the ensemble retriever
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5]

이제 결과를 살펴보자

docs = ensemble_retriever.invoke("apples")
docs

결과

[Document(page_content='You like apples', metadata={'source': 2}),
 Document(page_content='I like apples', metadata={'source': 1}),
 Document(page_content='You like oranges', metadata={'source': 2}),
 Document(page_content='Apples and oranges are fruits', metadata={'source': 1})]

Runtime Configuration

개발자는 runtime에 retriever들을 구성할 수 있다. 이를 위해서는 우선 field들을 configurable 할 수 있게 표시해야한다.

 from langchain_core.runnables import ConfigurableField

아래 과정을 통해서 개발자는 runtime에 faiss_retriever가 몇개의 문서를 반환할 지 결정할 수 있다.

 faiss_retriever = faiss_vectorstore.as_retriever(
    search_kwargs={"k": 2}
).configurable_fields(
    search_kwargs=ConfigurableField(
        id="search_kwargs_faiss",
        name="Search Kwargs",
        description="The search kwargs to use",
    )
)
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5]
)
config = {"configurable": {"search_kwargs_faiss": {"k": 1}}}
docs = ensemble_retriever.invoke("apples", config=config)
docs

'인공지능 > RAG' 카테고리의 다른 글

LangChain (10) Gemini로 RAG 구현  (0) 2024.06.04
What is Elastic Search?  (0) 2024.06.03
Langchain - Hybrid Search 구현  (0) 2024.05.31
MongoDB와 Gemma를 사용한 RAG 실습(LangChain X)  (0) 2024.05.28
Langchain - MessagesPlaceholder  (0) 2024.05.24
728x90

전제 조건

Vector Store에서 Hybrid Search를 지원해야 하기 때문에 이를 지원하는 vector store를 선정한다.

본 예제에서는 Astra DB를 사용한다.

Dependency 설치

우선 관련 Dependency를 설치한다. 터미널에 접속후 아래 명령어를 실행한다.

!pip install "cassio>=0.1.7"

Astra DB 회원 가입 및 DB 생성

이후 링크에 접속해서 회원가입을 하고 연결 정보를 생성한다. 우리에게 필요한건 Database ID, application token, keyspace 이다.

사실 위 링크에서 하라는대로 하면 된다

회원가입

링크에서 회원가입을 진행한다.

DB 생성

  1. 링크에서 좌측에 Database를 누른다.
  2. 우하단의 검은색 버튼 Create Database를 클릭한다.


3. 여기서 Serverless(Vector)를 선택하고 DB 이름을 설정한다. 이름을 한번 설정하면 변경이 불가하니 신중하게 생성한다.
Provider마다 조금씩 region이 다르긴 한데 Lock 되어있는건 비용을 지불해야한다. Asia Region이 없으니 어쩔 수 없이 US east를 선택한다. 설정이 완료되면 Create Database를 클릭한다.
4. Database가 active 상태가 된 이후 우측에 Application Tokens 탭의 Create Token을 눌러서 토큰을 생성한다. Token이 생성되면 다시는 확인할 길이 없기 때문에 clipboard에 복사 후 안전한 곳에 저장하자. 해당 토큰은 자동으로 데이터베이스 관리자 역할을 부여받는다. 자세한 사항은 여기
5. 이후 Overview 옆의 Data Explorer를 클릭한다. 현재 Namespace는 "default_keyspace"인데 원하는 Namespace를 생성하면 된다. 생성하는데 시간이 조금 걸린다.

실습

앞서 얻은 정보를 토대로 아래 코드에 정보를 삽입후 실행한다.

import cassio

cassio.init(
    database_id="Your database ID",
    token="Your application token",
    keyspace="Your key space",
)

이후 Cassandra vector store를 Standard index analyzer로 생성한다. 자세한 내용은 이 링크

Vector Store 실험

from cassio.table.cql import STANDARD_ANALYZER
from langchain_community.vectorstores import Cassandra
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Cassandra(
    embedding=embeddings,
    table_name="test_hybrid",
    body_index_options=[STANDARD_ANALYZER],
    session=None,
    keyspace=None,
)

vectorstore.add_texts(
    [
        "In 2023, I visited Paris",
        "In 2022, I visited New York",
        "In 2021, I visited New Orleans",
    ]
)

생성 후 결과는 아래와 같다

 'ee2a6192e7524ff49c95bd7f82e73d36',
 '0ef8aa454fe24569a6363436f8bddca3']
['271c225aaace4586b1c53e32f170013b',
 'ee2a6192e7524ff49c95bd7f82e73d36',
 '0ef8aa454fe24569a6363436f8bddca3']

이제 일반적인 유사도 검색을 하면 모든 문서가 검색되는 것을 확인할 수 있다.

vectorstore.as_retriever().invoke("What city did I visit last?")

결과는 아래와 같다.

[Document(page_content='In 2022, I visited New York'),
 Document(page_content='In 2023, I visited Paris'),
 Document(page_content='In 2021, I visited New Orleans')]

근데 Astra DB vector store의 body search argument를 넣으면 검색 결과를 필터링 할 수 있다.

가령, New라는 keyword가 들어간 도시 만을 찾고 싶다면 아래와 같이 적용할 수 있다.

vectorstore.as_retriever(search_kwargs={"body_search": "new"}).invoke(
    "What city did I visit last?"
)

결과는 다음과 같다.

[Document(page_content='In 2022, I visited New York'),
 Document(page_content='In 2021, I visited New Orleans')]

QA chain 생성

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import(
    ConfigurableField,
    RunnablePassthrough,
)
from langchain_openai import ChatOpenAI
  • Prompt Template, model, retriever를 정의한다.
    template = """Answer the question based only on the following context:
    {context}
    Question: {question}
    """
    

prompt = ChatPromptTemplate.from_template(template)

model = ChatOpenAI()

retriever = vectorstore.as_retriever()


- 여기서 Configurable field를 추가한 retriever를 생성한다. 모든 Vector store retriever들은 `search_kwargs`를 field로 가진다.
    Vector store 특정 field가 저장된 딕셔너리 타입이다.

```python
configurable_retriever = retriever.configurable_fields(
    search_kwargs=ConfigurableField(
        id='search_kwargs',
        name='Search Kwargs',
        description='The search Kwargs to use',
    )
)

이제 chain을 구성한다.

chain = (
    {"context" : configurable_retriever, "question" : RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

실험

cofigurable field에 값을 넣지 않고 invoke 하는 경우 결과

chain.invoke("What city did I visit last?")

결과

'Paris'

하지만 configurable field에 값을 넣은 후에는

chain.invoke(
    "What city did I visit last?",
    config={"configurable": {"search_kwargs":{"body_search":"new"}}}
)

결과

'New Orleans'

'인공지능 > RAG' 카테고리의 다른 글

What is Elastic Search?  (0) 2024.06.03
Langchain - Ensemble Retriever  (1) 2024.05.31
MongoDB와 Gemma를 사용한 RAG 실습(LangChain X)  (0) 2024.05.28
Langchain - MessagesPlaceholder  (0) 2024.05.24
Langchain - ChatPromptTemplate  (0) 2024.05.23
728x90

설명

  • 사용 모델 : gemma
  • 목적 : Langchain 프레임워크 기반으로 오픈소스 LLM과 문서 임베딩을 직접 수행하여 vectore db에 저장한 후 RAG 시스템을 구현하는 것
  • 기술 스택
    • Langchian
    • mongo db
    • RAG
    • gemma

STEP 1 : 라이브러리 설치

  • PyMongo: MongoDB와 상호 작용하는 Python 라이브러리로, 클러스터에 연결하고 컬렉션 및 문서에 저장된 데이터를 쿼리하는 기능을 제공합니다.
  • Pandas: 효율적인 데이터 처리 및 분석을 위해 Python을 사용하는 데이터 구조를 제공합니다.
  • Hugging Face datasets: 오디오, 비전, 텍스트 데이터셋을 보유하고 있습니다.
  • Hugging Face Accelerate: GPU와 같은 하드웨어 가속기를 사용하는 코드 작성의 복잡성을 추상화합니다. Accelerate는 GPU 리소스에서 Gemma 모델을 활용하기 위해 사용됩니다.
  • Hugging Face Transformers: 사전 훈련된 모델 컬렉션에 접근을 제공합니다.
  • Hugging Face Sentence Transformers: 문장, 텍스트, 이미지 임베딩에 접근을 제공합니다.
!pip install datasets pandas pymongo sentence_transformers -q
!pip install -U transformers -q
# 아래 라이브러리는 GPU 사용시 설치
!pip install accelerate -q

STEP 2 : 데이터 준비

#데이터셋 로드
from datasets import load_dataset
import pandas as pd

dataset = load_dataset('AIatMongoDB/embedded_movies')

dataset_df = pd.DataFrame(dataset['train'])
dataset_df.head()
  • 우선 dropna를 사용해서 데이터의 "fullplot" 속성이 비어있지 않게 설정한다.
  • 두번째로 "plot_embedding" 속성을 제거한다. 그리고 gte-large 모델로 새로 임베딩을 생성해서 저장한다
#비어있는 데이터를 제거한다
dataset_df = dataset_df.dropna(subset=['fullplot'])
print('\nNumber of missing values in each column after removal')
print(dataset_df.isnull().sum())

#plot_embedding 제거
dataset_df = dataset_df.drop(columns = ["plot_embedding"])
dataset_df.head(5)

STEP 3 : Embedding 생성

  1. 임베딩 모델에 접근하기 위해서 SentenceTransformers import
  2. SentenceTransformers를 사용해서 gte_large 임베딩 모델 로드
  3. get_embedding 함수 정의
    • 텍스트 문자열을 입력으로 받아서 임베딩을 출력하는 함수
    • 우선 입력 텍스트가 비어있는지 확인 후에 비어있으면 빈 리스트를 반환하고 아니면 임베디
  4. "fullplot" 열의 값을 get_embedding 함수에 넣어서 각 영화마다 임베딩을 생성한다. 새로 생성된 임베딩값들은 새로운 열에 할당된다.
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer("thenlper/gte-large")

def get_embedding(text:str) -> list[float]:
    if not text.strip():
        print("Attempted to get embedding for empty text.")
        return []

    embedding = embedding_model.encode(text)

    return embedding.tolist()

dataset_df["embedding"] = dataset_df["fullplot"].apply(get_embedding)

dataset_df.head()

STEP 4 : 데이터베이스 셋업 및 연결

  • mngo db는 일반적인 디비와 벡터 디비 역할 모두 수행. 효율적으로 저장하고 query하고 벡터임베딩을 검색 -> 데이터베이스 관리, 유지, 비용
  • 새로운 MongoDB 데이터베이스를 생성하기 위해서 데이터베이스 클러스터를 셋업해야한다.
  1. 다음 링크로 들어가서 무료 MongoDB 회원가입을 한다. 👉 링크
  2. 'Database' 옵션을 선택하면 현존하는 클러스터들의 배포 정보가 있는 데이터베이스 배포 페이지로 이동한다. 'build cluster' 버튼을 눌러서 새로운 데이터베이스 클러스터를 생성한다
  3. 데이터 베이스에 적용 가능한 모든 configuration을 선택하고 모든 configuration 옵션을 선택했으면 'Create Cluster' 버튼을 클릭해서 새로 생성한 클러스터를 배포한다. MongoDB는 shared tab을 통해서 무료 클러스터 생성도 지원한다.

Concept를 생성할때 Python host의 ip를 whitelist에 추가하거나(열어두거나), 0.0.0.0/0으로 ip를 설정해야한다.

  1. 클러스터 생성 및 배포 후 데이터베이스 배포 페이지에서 클러스터를 접근할 수 있다.
  2. 클러스터의 'connect' 버튼을 눌러서 다양한 언어 driver를 통해 연결하기 위한 옵션을 볼 수 있다. 여기서 driver를 선택하면 URI를 확인할 수 있다.
  3. 이 실습은 cluster의 URI만 필요하다. 이를 google colab 환경변수로 설정해주면 된다. 변수명은 MONGO_URI

step 4.1 데이터베이스 및 콜렉션 셋업

사전 준비할 사항을 미리 확인하자

  • MongoDB Atlas를 위한 Database cluster 설정
  • 클러스터 URI 확보
    클러스터를 생성한 후에는 MongoDB Atlas cluster 내부에 데이터베이스와 콜렉션을 만들어야한다.
    좌측에서 데이터베이스 클릭 > browse collection > create 를 눌러서 생성할 수 있다. Database : movies, collection : movie_collection_2이다.

STEP 5 : 벡터 검색 인덱스 생성

  • 이 과정에서는 MongoDB Atlas에 벡터 인덱스가 생성이 되어야한다.

  • 다음 단계는 정확하고 효율적인 벡터 기반 검색을 위한 필수 과정이다.

  1. database collection에 들어가서 search index를 누른다
  2. Atlas Vector Search의 JSON Editor를 누른다
  3. 아래 json 형식대로 생성한다.
  4. 기다리면 status가 Not Ready에서 Active로 바뀐다
  • 벡터 검색 인덱스를 만들면 문서를 효율적으로 탐색해서 벡터 유사도 기반으로 쿼리 임베딩과 가장 유사도가 높은 임베딩을 가진 문서를 검색한다.
    벡터 검색 인덱스에 대한 자세한 정보는 여기
    {
    "fields": [{
       "numDimensions": 1024,
       "path": "embedding",
       "similarity": "cosine",
       "type": "vector"
     }]
    }
  • numDimension 필드의 값 1024는 gte-large 임베딩 모델이 생성하는 차원에 해당합니다. gte-basegte-small 모델을 사용하게 되면 numDimension 값이 각각 768이나 384가 된다.

STEP 6 : 데이터 연결 생성

아래 코드를 통해서 PyMongo를 활용해서 MongoDB 클라이언트 객체를 생성한다. 이를 통해 클러스터와 연결하고 데이터베이스와 콜렉션에 접근할 수 있다.

import pymongo
from google.colab import userdata

def get_mongo_client(mongo_url):
    """ Establish conncetion to the Mongo DB"""
    try:
        client = pymongo.MongoClient(mongo_uri)
        print("Connection to MongoDB successful")

        return client
    except pymongo.errors.ConnectionFailure as e:
        print(f"Connection failed {e}")
        return None

mongo_uri = userdata.get("MONGO_URI")
if not mongo_uri:
    print("MONGO URI not set in environment variables")

mongo_client = get_mongo_client(mongo_uri)

#ingest data into Mongo DB
db = mongo_client["movies"]
collection = db["movie_collection_2"]
#Delete any existing records in the collection
collection.delete_many({})
documents = dataset_df.to_dict("records")
collection.insert_many(documents)

print("Data ingestion into MongoDB completed")

STEP 7 : 사용자의 질문에 벡터 검색 수행

다음 단계는 쿼리 임베딩을 생성하고 MongoDB 검색 파이프라인을 정의해서 벡터 검색 결과를 반환하는 함수를 구현합니다.

파이프라인은 Vector Search 단계와 Project 단계로 구성되어 있고 생성된 벡터로 쿼리를 실행하면서 결과에 대한 검색 점수를 통합하는 동시에 줄거리, 제목, 장르와 같이 필요한 정보 만을 포함하도록 결과 형식을 지정합니다.

def vector_search(user_query, collection):
    """
    Perform a vector search in the MongoDB collection based on the user query.

    Args:
    user_query (str): The user's query string.
    collection (MongoCollection): The MongoDB collection to search.

    Returns:
    list: A list of matching documents.
    """

    # Generate embedding for the user query
    query_embedding = get_embedding(user_query)

    if query_embedding is None:
        return "Invalid query or embedding generation failed"

    # Define the vector search pipeline
    pipeline = [
        {
            "$vectorSearch": {
                "index": "vector_index",
                "queryVector": query_embedding,
                "path": "embedding",
                "numCandidates": 150,  # Number of candidate matches to consider
                "limit": 4,  # Return top 4 matches
            }
        },
        {
            "$project": {
                "_id": 0,  # Exclude the _id field
                "fullplot": 1,  # Include the plot field
                "title": 1,  # Include the title field
                "genres": 1,  # Include the genres field
                "score": {"$meta": "vectorSearchScore"},  # Include the search score
            }
        },
    ]

    # Excute the search
    results = collection.aggregate(pipeline)
    return list(results)

STEP 8 : 사용자 쿼리 처리 및 Gemma 로드

  • 구글 gemma를 사용하려면 huggingface 사이트에서 인가를 받아야한다. 👉링크
  • 인가를 받은 후 huggingface의 access token을 코랩 환경변수로 설정 한 후 모델 로드시 매개변수로 전달한다.
def get_search_result(query, collection):
    get_knowledge = vector_search(query, collection)

    search_result = ""
    for result in get_knowledge:
        search_result += f"Title: {result.get('title', 'N/A')}, Plot: {result.get('fullplot','N/A')}\n"

        return search_result
# Conduct query with retival of sources
query = "What is the best romantic movie to watch and why?"
source_information = get_search_result(query,collection)
combined_information = f"Query: {query}\nContinue to answer the query by using the Search Results:\n{source_information}."

print(combined_information)
import os
from google.colab import userdata
HUGGINGFACE_API_KEY = userdata.get('HUGGINGFACE_API_KEY')
from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained("google/gemma-2b-it", token = HUGGINGFACE_API_KEY)
#CPU를 사용하는 경우 아래 주석을 해제
# model = AutoModelForCausalLM.from_pretrained("google/gemma-2b-it")
#GPU를 사용하는 경우 아래 주석을 해제
model = AutoModelForCausalLM.from_pretrained("google/gemma-2b-it",token = HUGGINGFACE_API_KEY, device_map = "auto")

결과

Query: What is the best romantic movie to watch and why?
Continue to answer the query by using the Search Results:
Title: Shut Up and Kiss Me!, Plot: Ryan and Pete are 27-year old best friends in Miami, born on the same day and each searching for the perfect woman. Ryan is a rookie stockbroker living with his psychic Mom. Pete is a slick surfer dude yet to find commitment. Each meets the women of their dreams on the same day. Ryan knocks heads in an elevator with the gorgeous Jessica, passing out before getting her number. Pete falls for the insatiable Tiara, but Tiara's uncle is mob boss Vincent Bublione, charged with her protection. This high-energy romantic comedy asks to what extent will you go for true love?
# GPU로 텐서 이동
input_ids= tokenizer(combined_information, return_tensors="pt").to("cuda")
response = model.generate(**input_ids, max_new_tokens=500)
print(tokenizer.decode(response[0]))

결과

<bos>Query: What is the best romantic movie to watch and why?
Continue to answer the query by using the Search Results:
Title: Shut Up and Kiss Me!, Plot: Ryan and Pete are 27-year old best friends in Miami, born on the same day and each searching for the perfect woman. Ryan is a rookie stockbroker living with his psychic Mom. Pete is a slick surfer dude yet to find commitment. Each meets the women of their dreams on the same day. Ryan knocks heads in an elevator with the gorgeous Jessica, passing out before getting her number. Pete falls for the insatiable Tiara, but Tiara's uncle is mob boss Vincent Bublione, charged with her protection. This high-energy romantic comedy asks to what extent will you go for true love?
.

The best romantic movie to watch is **Shut Up and Kiss Me!** because it perfectly captures the essence of romantic comedies. The movie perfectly balances humor, heart, and romance, making it a delightful and unforgettable viewing experience.<eos>

REFERENCE https://github.com/huggingface/cookbook/blob/main/notebooks/en/rag_with_hugging_face_gemma_mongodb.ipynb

'인공지능 > RAG' 카테고리의 다른 글

Langchain - Ensemble Retriever  (1) 2024.05.31
Langchain - Hybrid Search 구현  (0) 2024.05.31
Langchain - MessagesPlaceholder  (0) 2024.05.24
Langchain - ChatPromptTemplate  (0) 2024.05.23
LangChain (9) Retrieval - Retriever  (0) 2024.04.08

+ Recent posts