728x90

Runnable

Runnable이란 invoked, batched, streamed, transformed and composed 될 수 있는 일의 단위이다.

  • invoke/ainvoke: 하나의 input을 output으로 변경시켜주는 메서드
  • batch/abatch: 여러 개의 input을 output으로 변경시켜주는 메서드
  • stream/astream: 결과물을 stream 해준다.
  • astream_log: 출력 및 입력에서 선택된 중간 결과를 Stream 해준다.

Runnable을 구성하는 주요 primitive(구성 요소)는 RunnableSequence and RunnableParallel이다.

  • RunnableSequence는 한 Runnable의 결과물을 다음 과정의 input으로 사용함으로써 Runnable들을 순차적으로 invoke한다.
    • | 연산자를 사용하거나 Runnable의 list를 통해서 구성한다.
  • RunnableParallel은 동일한 input을 여러 Runnable들에 전달함으로써 Runnable들을 동시에 invoke한다.
    • dict 타입을 활용해서 구성한다.

실습

1. Runnable Sequence

from langchain_core.runnables import RunnableLambda

# | 연산자를 통해 RunnableSequence를 구성하였다
sequence = RunnableLambda(lambda x : x + 1) | RunnableLambda(lambda x : x * 2)
sequence.invoke(1)

1이 들어가서 RunnableLambda에 의해 1+1 연산이 먼저 이루어지고

그 결과인 2가 두번째 RunnableLambda의 입력값으로 들어가서 2 * 2 연산이 이루어진다.

최종 결과는 4

sequence.batch([1,2,3])

앞선 과정이 병렬적으로 동시에 일어난다.

2. Runnable Parallel

# A sequence that contains a RunnableParallel constructed using a dict literal
sequence = RunnableLambda(lambda x: x + 1) | {
    'mul_2': RunnableLambda(lambda x: x * 2),
    'mul_5': RunnableLambda(lambda x: x * 5)
}
sequence.invoke(1)

위 코드는 우선 첫번째 RunnableLambda가 수행된 이후 2가지 RunnableLambda가 병렬적으로 수행된다.

이런 특성을 활용하면 서로 다른 2가지 Retriever를 동시에 적용하여 문서를 검색할 수 있다.

가령 lexical search(키워드 기반)와 semantic search(유사도 기반)를 따로 수행하고 검색한 문서를 기반으로 ReRanking을 수행해서 LLM의 최종 답변을 얻을 수 있다.

3. Runnable Method 사용하기

모든 Runnable은 동작을 수정(실패시 재시도, configurable하게 만들기)하는 데 사용할 수 있는 추가 메서드가 있다.

from langchain_core.runnables import RunnableLambda
import random

def add_one(x: int) -> int:
    return x + 1

def buggy_double(y: int) -> int:
    # 70%의 확률로 fail하는 buggy code
    if random.random() > 0.3 :
        print("This code failed, will probably be retried")
        raise ValueError("Triggerd Buggy Code!")
    return y * 2

sequence = (
    RunnableLambda(add_one) |
    RunnableLambda(buggy_double).with_retry(    # 실패시 재시도하는 메서드
        stop_after_attempt=10,
        wait_exponential_jitter=False
    )
)

print(sequence.input_schema.schema())
print(sequence.output_schema.schema())
print(sequence.invoke(2))

위 코드를 살펴보면 add_one함수는 1을 더해주고 buggy_double 함수는 random 값을 구해서 0.3 이상인 경우
ValueError를 raise하는 코드이다.

RunnableSequence 객체는 input에 대해 add_one 함수를 적용시키고 그 결과를 다시 input으로

buggy_double 함수를 실행하는데 with_retry 메서드를 실행하는데 이 함수는 stop_after_attempt

통해서 몇 번 retry할지 설정할 수 있고 재시도 대기 시간에 wait_exponential_jitter를 통해 jitter를 사용 할 지 설정 할 수 있다.

 

Attributes

InputType The type of input this runnable accepts specified as a type annotation.
OutputType The type of output this runnable produces specified as a type annotation.
config_specs List configurable fields for this runnable.
input_schema The type of input this runnable accepts specified as a pydantic model.
name The name of the runnable.
output_schema The type of output this runnable produces specified as a pydantic model.

Methods

__init__()
abatch(inputs[, config, return_exceptions]) Default implementation runs ainvoke in parallel using asyncio.gather.
abatch_as_completed() Run ainvoke in parallel on a list of inputs, yielding results as they complete.
ainvoke(input[, config]) Default implementation of ainvoke, calls invoke from a thread.
assign(**kwargs) Assigns new fields to the dict output of this runnable.
astream(input[, config]) Default implementation of astream, which calls ainvoke.
astream_events(input[, config, ...]) [Beta] Generate a stream of events.
astream_log() Stream all output from a runnable, as reported to the callback system.
atransform(input[, config]) Default implementation of atransform, which buffers input and calls astream.
batch(inputs[, config, return_exceptions]) Default implementation runs invoke in parallel using a thread pool executor.
batch_as_completed() Run invoke in parallel on a list of inputs, yielding results as they complete.
bind(**kwargs) Bind arguments to a Runnable, returning a new Runnable.
config_schema(*[, include]) The type of config this runnable accepts specified as a pydantic model.
get_graph([config]) Return a graph representation of this runnable.
get_input_schema([config]) Get a pydantic model that can be used to validate input to the runnable.
get_name([suffix, name]) Get the name of the runnable.
get_output_schema([config]) Get a pydantic model that can be used to validate output to the runnable.
get_prompts([config])
invoke(input[, config]) Transform a single input into an output.
map() Return a new Runnable that maps a list of inputs to a list of outputs, by calling invoke() with each input.
pick(keys) Pick keys from the dict output of this runnable.
pipe(*others[, name]) Compose this Runnable with Runnable-like objects to make a RunnableSequence.
stream(input[, config]) Default implementation of stream, which calls invoke.
transform(input[, config]) Default implementation of transform, which buffers input and then calls stream.
with_config([config]) Bind config to a Runnable, returning a new Runnable.
with_fallbacks(fallbacks, *[, ...]) Add fallbacks to a runnable, returning a new Runnable.
with_listeners(*[, on_start, on_end, on_error]) Bind lifecycle listeners to a Runnable, returning a new Runnable.
with_retry(*[, retry_if_exception_type, ...]) Create a new Runnable that retries the original runnable on exceptions.
with_types(*[, input_type, output_type]) Bind input and output types to a Runnable, returning a new Runnable.

 

 

출처 : 링크

'인공지능 > RAG' 카테고리의 다른 글

LangChain - Routing Chain  (1) 2024.06.04
LangChain - Dynamic Chain  (1) 2024.06.04
LangChain - RunnablePassTrough  (0) 2024.06.04
LangChain (11) 오픈소스 LLM으로 RAG 구현  (0) 2024.06.04
LangChain (10) Gemini로 RAG 구현  (0) 2024.06.04

+ Recent posts