Creat RAG AI Agent with PydanticAI, PostreSQL and OpenAI: Step by Step Guide

Skolo Online Learning
6 min readJan 13, 2025

--

In this article, I will show you step by step how to create a RAG (Retrieval Augmented Generation) application with PydanticAI. The code is simpler and cleaner compared to a manual implementation of RAG and we completed the entire code in about an hour on this video here:

You can also read this article here on Github: https://github.com/skolo-online/pydantic-ai-RAG

Pre-Requisites:

PydanticAI RAG Agent

Before you get started you will need the following:

Database Set Up

We need to set up our database, get the connection string and create a clean table using this schema below:

DB_SCHEMA = """
CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS text_chunks (
id serial PRIMARY KEY,
chunk text NOT NULL,
embedding vector(1536) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_text_chunks_embedding ON text_chunks USING hnsw (embedding vector_l2_ops);
"""

The rest of the code for the database is shown below:

from __future__ import annotations as _annotations

from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import List
import pydantic_core

import asyncpg
import httpx
import fitz
import json
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIModel

from openai import AsyncOpenAI

DB_DSN = "database-dsn-goes-here"
OPENAI_API_KEY = "sk-proj-your-api-key-goes-here"

@asynccontextmanager
async def database_connect(create_db: bool = False):
"""Manage database connection pool."""
pool = await asyncpg.create_pool(DB_DSN)
try:
if create_db:
async with pool.acquire() as conn:
await conn.execute(DB_SCHEMA)
yield pool
finally:
await pool.close()

class Chunk(BaseModel):
chunk: str

async def split_text_into_chunks(text: str, max_words: int = 400, overlap: float = 0.2) -> List[Chunk]:
"""Split long text into smaller chunks based on word count with overlap."""
words = text.split()
chunks = []
step_size = int(max_words * (1 - overlap))

for start in range(0, len(words), step_size):
end = start + max_words
chunk_words = words[start:end]
if chunk_words:
chunks.append(Chunk(chunk=" ".join(chunk_words)))

return chunks

async def insert_chunks(pool: asyncpg.Pool, chunks: List[Chunk], openai_client: AsyncOpenAI):
"""Insert text chunks into the database with embeddings."""
for chunk in chunks:
embedding_response = await openai_client.embeddings.create(
input=chunk.chunk,
model="text-embedding-3-small"
)

# Extract embedding data and convert to JSON format
assert len(embedding_response.data) == 1, f"Expected 1 embedding, got {len(embedding_response.data)}"
embedding_data = json.dumps(embedding_response.data[0].embedding)

# Insert into the database
await pool.execute(
'INSERT INTO text_chunks (chunk, embedding) VALUES ($1, $2)',
chunk.chunk,
embedding_data
)

async def download_pdf(url: str) -> bytes:
"""Download PDF from a given URL."""
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.content

def extract_text_from_pdf(pdf_content: bytes) -> str:
"""Extract text from PDF content."""
document = fitz.open(stream=pdf_content, filetype="pdf")
text = ""
for page_num in range(document.page_count):
page = document.load_page(page_num)
text += page.get_text()
return text

async def add_pdf_to_db(url: str):
"""Download PDF, extract text, and add to the embeddings database."""
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
pdf_content = await download_pdf(url)
text = extract_text_from_pdf(pdf_content)
async with database_connect(create_db=True) as pool:
chunks = await split_text_into_chunks(text)
await insert_chunks(pool, chunks, openai_client)

async def update_db_with_pdf(url: str):
"""Download PDF, extract text, and update the embeddings database."""
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
pdf_content = await download_pdf(url)
text = extract_text_from_pdf(pdf_content)
async with database_connect() as pool:
chunks = await split_text_into_chunks(text)
await insert_chunks(pool, chunks, openai_client)


async def execute_url_pdf(url: str):
"""
Check if the database table exists, and call the appropriate function
to handle the PDF URL.
"""
async with database_connect() as pool:
table_exists = await pool.fetchval("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'text_chunks'
)
""")

if table_exists:
# If the table exists, update the database
print("Table exists. Updating database with PDF content.")
await update_db_with_pdf(url)
else:
# If the table does not exist, add the PDF and create the table
print("Table does not exist. Adding PDF and creating the table.")
await add_pdf_to_db(url)

This code above will do the following:

  1. Connect to the database, using the DB_DSN string you have provided and create a new table according to the schema, if a table does not yet exist.
  2. Take a PDF document and extract the text from the document.
  3. Split the document in to chunks with 20% overlap
  4. Take the chunks and create embeddings using OpenAI embeddings model
  5. Save the created embeddings in to the database alongside the chunks

This code also allows you to add new PDF document to the same table if you need to, so you can upload multiple documents.

💡 Idea: This code only allows for PDF uploads for now. You could expand it by adding functions to extract the content of other document types, eg: Word document, Excel spreadsheet, Powerpoint etc.

🚀 Improve the code by using more sophisticated chunking methods to preserve context and optimise the RAG output.

PydanticAI RAG Agent Code

Add this code below to create a PydanticAI agent with a retrieve tool. RAG is added to the agent simply as a retrieval tool, meaning you can add this tool with many other tools such as the AI Agent CRUD Tools we explored in the previous article.

@dataclass
class Deps:
pool: asyncpg.Pool
openai: AsyncOpenAI


# Initialize the agent
model = OpenAIModel("gpt-4o", api_key=OPENAI_API_KEY)
rag_agent = Agent(model, deps_type=Deps)

@rag_agent.tool
async def retrieve(context: RunContext[Deps], search_query: str) -> str:
"""Retrieve documentation sections based on a search query.

Args:
context: The call context.
search_query: The search query.
"""
print("Retrieving..............")
embedding = await context.deps.openai.embeddings.create(
input=search_query,
model='text-embedding-3-small',
)

assert (
len(embedding.data) == 1
), f'Expected 1 embedding, got {len(embedding.data)}, doc query: {search_query!r}'

embedding = embedding.data[0].embedding
embedding_json = pydantic_core.to_json(embedding).decode()
rows = await context.deps.pool.fetch(
'SELECT chunk FROM text_chunks ORDER BY embedding <-> $1 LIMIT 5',
embedding_json,
)
from_db = '\n\n'.join(
f'# Chunk:\n{row["chunk"]}\n'
for row in rows
)
return from_db

async def run_agent(question: str):
"""Entry point to run the agent and perform RAG-based question answering."""

# Set up the agent and dependencies
async with database_connect() as pool:
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)

async with database_connect(False) as pool:
deps = Deps(openai=openai_client, pool=pool)
base_instruction = f"Use the 'retrieve' tool to fetch information to help you answer this question: {question}"
answer = await rag_agent.run(base_instruction, deps=deps)
return answer.data

Our agent is called rag_agent and we initialise it with a Dependency class that contains a database connection and an OpenAI client. This dependency will allow the rag_agent to connect to the database and answer questions with information in the database, just using the tool.

This is amazing because the AI agent will decide if they should call the tool and the work will have in the background and we will get the compelted response, without having to perform multiple AI calls to first filter the database and then answer the question.

Front-End Application: Streamlit

The agent can be accessed through the front-end streamlit application. The code for the app is shown below:

import streamlit as st
import asyncio
from aiagent import execute_url_pdf, run_agent

# Streamlit Page Configuration
st.set_page_config(
page_title="AI Assistant 📚🤖",
page_icon="📚",
layout="wide"
)

# Title
st.title("AI Assistant 📚🤖")
st.write("Interact with your PDF-based AI assistant. Use the options below to upload a PDF or ask a question.")

# Layout with Two Columns
col1, col2 = st.columns(2)

# Column 1: PDF Upload via URL
with col1:
st.subheader("📄 Upload PDF")
pdf_url = st.text_input("Enter the URL of the PDF document:", placeholder="https://example.com/document.pdf")
if st.button("📥 Add PDF to Database"):
if pdf_url:
with st.spinner("Processing the PDF and updating the database..."):
try:
asyncio.run(execute_url_pdf(pdf_url))
st.success("PDF successfully processed and added to the database!")
except Exception as e:
st.error(f"Error processing the PDF: {e}")
else:
st.warning("Please enter a valid URL.")

# Column 2: Ask a Question
with col2:
st.subheader("❓ Ask a Question")
question = st.text_input("Enter your question:", placeholder="What are the responsibilities of a full-stack developer?")
if st.button("🔍 Get Answer"):
if question:
with st.spinner("Thinking..."):
try:
answer = asyncio.run(run_agent(question))
st.success("Here's the answer:")
st.write(answer)
except Exception as e:
st.error(f"Error getting the answer: {e}")
else:
st.warning("Please enter a valid question.")

# Footer
st.markdown("---")
st.write("✨ Powered by [Skolo Online](https://skolo.online) and Pydantic AI")

From the front-end you can:

  1. Upload PDF documents
  2. Ask the PydanticAI Agent questions about the PDFs you have uploaded

The front-end should look like so:

Streamlit PydanticAI RAG Application

Let me know your thoughts in the comments, all suggestions and improvements welcome.

--

--

Skolo Online Learning
Skolo Online Learning

Written by Skolo Online Learning

Founder of Tati Digital, passionate about digital transformation and ushering businesses in to the digital age and 4th industrial revolution.

No responses yet