Skip to content

RAG Tutorial

RAG - Document Indexing Flow

This workflow takes input PDF files, pre-processes them, converts them to vector embeddings, and stores them in a vector database (Pinecone, Elasticsearch, etc.).

Step-by-Step Guide

Import Necessary Libraries

from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Pinecone as PineconeConnection,
    Elasticsearch as ElasticsearchConnection
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import PineconeDocumentWriter, ElasticsearchDocumentWriter

Initialize the RAG Workflow

rag_wf = Workflow()

PyPDF Document Converter

Convert the PDF documents into a format suitable for processing.

converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
rag_wf.flow.add_nodes(converter)  # Add node to the DAG

Document Splitter

Split the documents into smaller chunks for better processing.

document_splitter = DocumentSplitter(
    split_by="sentence",
    split_length=10,
    split_overlap=1,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[converter.id]}.output.documents",
        },  # Map output of the previous node to the expected input of the current node
    ),
).depends_on(converter)
rag_wf.flow.add_nodes(document_splitter)

OpenAI Vector Embeddings

Convert the document chunks into vector embeddings using OpenAI.

embedder = OpenAIDocumentEmbedder(
    connection=OpenAIConnection(api_key="$OPENAI_API_KEY"),
    model="text-embedding-3-small",
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[document_splitter.id]}.output.documents",
        },
    ),
).depends_on(document_splitter)
rag_wf.flow.add_nodes(embedder)

Vector Storage Options

You can choose between different vector stores for document storage. Here are examples for both Pinecone and Elasticsearch:

Option 1: Pinecone Vector Storage

Store the vector embeddings in the Pinecone vector database.

vector_store = PineconeDocumentWriter(
    connection=PineconeConnection(api_key="$PINECONE_API_KEY"),
    index_name="default",
    dimension=1536,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[embedder.id]}.output.documents",
        },
    ),
).depends_on(embedder)
rag_wf.flow.add_nodes(vector_store)

If you don't have an index in the database and want to create it programmatically, you need to specify the parameter create_if_not_exist=True and, depending on your deployment type, specify the additional parameters needed for index creation.

If you have a serverless Pinecone deployment, your vector store initialization might look like this:

# Pinecone vector storage
vector_store = (
    PineconeDocumentWriter(
        connection=PineconeConnection(),
        index_name="quickstart",
        dimension=1536,
        create_if_not_exist=True,
        index_type="serverless",
        cloud="aws",
        region="us-east-1"
    )
    .inputs(documents=embedder.outputs.documents)
    .depends_on(embedder)
)

If you have a pod-based deployment, your vector store initialization could look like this:

# Pinecone vector storage
vector_store = (
    PineconeDocumentWriter(
        connection=PineconeConnection(),
        index_name="quickstart",
        dimension=1536,
        create_if_not_exist=True,
        index_type="pod",
        environment="us-west1-gcp",
        pod_type="p1.x1",
        pods=1
    )
    .inputs(documents=embedder.outputs.documents)
    .depends_on(embedder)
)

Option 2: Elasticsearch Vector Storage

Store the vector embeddings in Elasticsearch.

For local setup:

vector_store = ElasticsearchDocumentWriter(
    connection=ElasticsearchConnection(
        url="$ELASTICSEARCH_URL",
        api_key="$ELASTICSEARCH_API_KEY",
    ),
    index_name="documents",
    dimension=1536,
    similarity="cosine",
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[embedder.id]}.output.documents",
        },
    ),
).depends_on(embedder)
rag_wf.flow.add_nodes(vector_store)

For Elastic Cloud deployment:

vector_store = ElasticsearchDocumentWriter(
    connection=ElasticsearchConnection(
        username="$ELASTICSEARCH_USERNAME",
        password="$ELASTICSEARCH_PASSWORD",
        cloud_id="$ELASTICSEARCH_CLOUD_ID",
    ),
    index_name="documents",
    dimension=1536,
    create_if_not_exist=True,
    index_settings={
        "number_of_shards": 1,
        "number_of_replicas": 1
    },
    mapping_settings={
        "dynamic": "strict"
    }
).depends_on(embedder)

Prepare Input PDF Files

Prepare the input PDF files for processing.

file_paths = ["example.pdf"]
input_data = {
    "files": [
        BytesIO(open(path, "rb").read()) for path in file_paths
    ],
    "metadata": [
        {"filename": path} for path in file_paths
    ],
}

Run RAG Indexing Flow

Execute the workflow to process and store the documents.

rag_wf.run(input_data=input_data)

RAG - Document Retrieval Flow

This simple retrieval RAG flow searches for relevant documents and answers the original user question using the retrieved documents.

Step-by-Step Guide

Import Necessary Libraries

from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Pinecone as PineconeConnection,
    Elasticsearch as ElasticsearchConnection
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import PineconeDocumentRetriever, ElasticsearchDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt

Initialize the RAG Retrieval Workflow

retrieval_wf = Workflow()

Shared OpenAI Connection

Set up a shared connection to OpenAI.

openai_connection = OpenAIConnection(api_key="$OPENAI_API_KEY")

OpenAI Text Embedder for Query Embedding

Embed the user query into a vector format.

embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)
retrieval_wf.flow.add_nodes(embedder)

Document Retriever Options

You can choose between different retrievers. Here are examples for both Pinecone and Elasticsearch:

Option 1: Pinecone Document Retriever

document_retriever = PineconeDocumentRetriever(
    connection=PineconeConnection(api_key="$PINECONE_API_KEY"),
    index_name="default",
    dimension=1536,
    top_k=5,
    input_transformer=InputTransformer(
        selector={
            "embedding": f"${[embedder.id]}.output.embedding",
        },
    ),
).depends_on(embedder)
retrieval_wf.flow.add_nodes(document_retriever)

Option 2: Elasticsearch Document Retriever

For local setup:

# Vector similarity search with Elasticsearch
document_retriever = ElasticsearchDocumentRetriever(
    connection=ElasticsearchConnection(
        url="$ELASTICSEARCH_URL",
        api_key="$ELASTICSEARCH_API_KEY",
    ),
    index_name="documents",
    top_k=5,
    input_transformer=InputTransformer(
        selector={
            "query": f"${[embedder.id]}.output.embedding",  # Vector query for similarity search
        },
    ),
).depends_on(embedder)
retrieval_wf.flow.add_nodes(document_retriever)

For cloud deployment with score normalization:

document_retriever = ElasticsearchDocumentRetriever(
    connection=ElasticsearchConnection(
        username="$ELASTICSEARCH_USERNAME",
        password="$ELASTICSEARCH_PASSWORD",
        cloud_id="$ELASTICSEARCH_CLOUD_ID",
    ),
    index_name="documents",
    top_k=5,
    scale_scores=True,  # Scale scores to 0-1 range
    input_transformer=InputTransformer(
        selector={
            "query": f"${[embedder.id]}.output.embedding",  # Vector query for similarity search
        },
    ),
).depends_on(embedder)

Define the Prompt Template

Create a template for generating answers based on the retrieved documents.

prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""

OpenAI LLM for Answer Generation

Generate an answer to the user query using OpenAI's language model.

prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

answer_generator = OpenAI(
    connection=openai_connection,
    model="gpt-4o",
    prompt=prompt,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[document_retriever.id]}.output.documents",
            "query": f"${[embedder.id]}.output.query",
        },  # Take documents from the vector store node and query from the embedder
    ),
).depends_on([embedder, document_retriever])
retrieval_wf.flow.add_nodes(answer_generator)

Run the RAG Retrieval Flow

Execute the workflow to retrieve and answer the user query.

question = "What are the line items provided in the invoice?"
result = retrieval_wf.run(input_data={"query": question})

# Print the answer
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)