Runnable
Runnable
이란 invoked, batched, streamed, transformed and composed 될 수 있는 일의 단위이다.
- invoke/ainvoke: 하나의 input을 output으로 변경시켜주는 메서드
- batch/abatch: 여러 개의 input을 output으로 변경시켜주는 메서드
- stream/astream: 결과물을 stream 해준다.
- astream_log: 출력 및 입력에서 선택된 중간 결과를 Stream 해준다.
Runnable
을 구성하는 주요 primitive(구성 요소)는 RunnableSequence
and RunnableParallel
이다.
RunnableSequence
는 한Runnable
의 결과물을 다음 과정의 input으로 사용함으로써Runnable
들을 순차적으로 invoke한다.- | 연산자를 사용하거나 Runnable의 list를 통해서 구성한다.
RunnableParallel
은 동일한 input을 여러Runnable
들에 전달함으로써Runnable
들을 동시에 invoke한다.- dict 타입을 활용해서 구성한다.
실습
1. Runnable Sequence
from langchain_core.runnables import RunnableLambda
# | 연산자를 통해 RunnableSequence를 구성하였다
sequence = RunnableLambda(lambda x : x + 1) | RunnableLambda(lambda x : x * 2)
sequence.invoke(1)
1이 들어가서 RunnableLambda
에 의해 1+1 연산이 먼저 이루어지고
그 결과인 2가 두번째 RunnableLambda
의 입력값으로 들어가서 2 * 2 연산이 이루어진다.
최종 결과는 4
sequence.batch([1,2,3])
앞선 과정이 병렬적으로 동시에 일어난다.
2. Runnable Parallel
# A sequence that contains a RunnableParallel constructed using a dict literal
sequence = RunnableLambda(lambda x: x + 1) | {
'mul_2': RunnableLambda(lambda x: x * 2),
'mul_5': RunnableLambda(lambda x: x * 5)
}
sequence.invoke(1)
위 코드는 우선 첫번째 RunnableLambda
가 수행된 이후 2가지 RunnableLambda
가 병렬적으로 수행된다.
이런 특성을 활용하면 서로 다른 2가지 Retriever를 동시에 적용하여 문서를 검색할 수 있다.
가령 lexical search(키워드 기반)와 semantic search(유사도 기반)를 따로 수행하고 검색한 문서를 기반으로 ReRanking을 수행해서 LLM의 최종 답변을 얻을 수 있다.
3. Runnable Method 사용하기
모든 Runnable은 동작을 수정(실패시 재시도, configurable하게 만들기)하는 데 사용할 수 있는 추가 메서드가 있다.
from langchain_core.runnables import RunnableLambda
import random
def add_one(x: int) -> int:
return x + 1
def buggy_double(y: int) -> int:
# 70%의 확률로 fail하는 buggy code
if random.random() > 0.3 :
print("This code failed, will probably be retried")
raise ValueError("Triggerd Buggy Code!")
return y * 2
sequence = (
RunnableLambda(add_one) |
RunnableLambda(buggy_double).with_retry( # 실패시 재시도하는 메서드
stop_after_attempt=10,
wait_exponential_jitter=False
)
)
print(sequence.input_schema.schema())
print(sequence.output_schema.schema())
print(sequence.invoke(2))
위 코드를 살펴보면 add_one
함수는 1을 더해주고 buggy_double
함수는 random 값을 구해서 0.3 이상인 경우
ValueError를 raise하는 코드이다.
RunnableSequence
객체는 input에 대해 add_one 함수를 적용시키고 그 결과를 다시 input으로
buggy_double
함수를 실행하는데 with_retry
메서드를 실행하는데 이 함수는 stop_after_attempt
를
통해서 몇 번 retry할지 설정할 수 있고 재시도 대기 시간에 wait_exponential_jitter
를 통해 jitter를 사용 할 지 설정 할 수 있다.
Attributes
InputType | The type of input this runnable accepts specified as a type annotation. |
OutputType | The type of output this runnable produces specified as a type annotation. |
config_specs | List configurable fields for this runnable. |
input_schema | The type of input this runnable accepts specified as a pydantic model. |
name | The name of the runnable. |
output_schema | The type of output this runnable produces specified as a pydantic model. |
Methods
__init__() | |
abatch(inputs[, config, return_exceptions]) | Default implementation runs ainvoke in parallel using asyncio.gather. |
abatch_as_completed() | Run ainvoke in parallel on a list of inputs, yielding results as they complete. |
ainvoke(input[, config]) | Default implementation of ainvoke, calls invoke from a thread. |
assign(**kwargs) | Assigns new fields to the dict output of this runnable. |
astream(input[, config]) | Default implementation of astream, which calls ainvoke. |
astream_events(input[, config, ...]) | [Beta] Generate a stream of events. |
astream_log() | Stream all output from a runnable, as reported to the callback system. |
atransform(input[, config]) | Default implementation of atransform, which buffers input and calls astream. |
batch(inputs[, config, return_exceptions]) | Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed() | Run invoke in parallel on a list of inputs, yielding results as they complete. |
bind(**kwargs) | Bind arguments to a Runnable, returning a new Runnable. |
config_schema(*[, include]) | The type of config this runnable accepts specified as a pydantic model. |
get_graph([config]) | Return a graph representation of this runnable. |
get_input_schema([config]) | Get a pydantic model that can be used to validate input to the runnable. |
get_name([suffix, name]) | Get the name of the runnable. |
get_output_schema([config]) | Get a pydantic model that can be used to validate output to the runnable. |
get_prompts([config]) | |
invoke(input[, config]) | Transform a single input into an output. |
map() | Return a new Runnable that maps a list of inputs to a list of outputs, by calling invoke() with each input. |
pick(keys) | Pick keys from the dict output of this runnable. |
pipe(*others[, name]) | Compose this Runnable with Runnable-like objects to make a RunnableSequence. |
stream(input[, config]) | Default implementation of stream, which calls invoke. |
transform(input[, config]) | Default implementation of transform, which buffers input and then calls stream. |
with_config([config]) | Bind config to a Runnable, returning a new Runnable. |
with_fallbacks(fallbacks, *[, ...]) | Add fallbacks to a runnable, returning a new Runnable. |
with_listeners(*[, on_start, on_end, on_error]) | Bind lifecycle listeners to a Runnable, returning a new Runnable. |
with_retry(*[, retry_if_exception_type, ...]) | Create a new Runnable that retries the original runnable on exceptions. |
with_types(*[, input_type, output_type]) | Bind input and output types to a Runnable, returning a new Runnable. |
출처 : 링크
'인공지능 > RAG' 카테고리의 다른 글
LangChain - Routing Chain (1) | 2024.06.04 |
---|---|
LangChain - Dynamic Chain (1) | 2024.06.04 |
LangChain - RunnablePassTrough (0) | 2024.06.04 |
LangChain (11) 오픈소스 LLM으로 RAG 구현 (0) | 2024.06.04 |
LangChain (10) Gemini로 RAG 구현 (0) | 2024.06.04 |