Building a RAG Pipeline with MongoDB: Vector Search for Personalized Picks
This article explores the construction of a movie recommendation system using a Retrieval-Augmented Generation (RAG) pipeline. The objective is to learn how to harness the power of MongoDB’s vector search capabilities, transform data descriptions into searchable digital fingerprints, and create a system that understands the nuances of your preferences and your communication. In other words, we will aim to build a recommendation system that’s not just smart, but also efficient.
By the end of this article, you’ll have built a functional movie recommendation system. This system will be able to take a user’s query, such as “I want to watch a good sci-fi movie that explores artificial intelligence” or “What is a good animated film that adults would enjoy too? What makes your suggestion a good fit?” and return relevant movie suggestions and the choice reasoning.
What is a RAG Pipeline?
A RAG pipeline refers to the sequential flow of data through a series of processing steps that combines the strengths of large language models (LLMs) with structured data retrieval. It works by first retrieving relevant information from a knowledge base, and then using this information to augment the input of a large language model, which generates the final output. The primary objective of such a pipeline is to generate more accurate, contextually appropriate, and personalised responses to user-specific queries from vast databases.
Why MongoDB?
MongoDB is a open-source NoSQL database that stores data in flexible, JSON-like documents, allowing for easy scalability and handling of diverse data types and structures. MongoDB plays a significant role in this project. Its document model aligns well with our movie data, while its vector search capabilities enable similarity searches on our embeddings (i.e., the numerical representations of movie content). We can also take advantage of indexing and query optimisation features to maintain quick data retrieval even as the dataset expands.
Our Project
Here’s what our pipeline will look like:
- Set up the environment and load movie data from Hugging Face
- Model the data using Pydantic
- Generate embeddings for the movies information
- Ingest the data into a MongoDB database
- Create a Vector Search Index in MongoDB Atlas
- Perform vector search operations to find relevant movies
- Handle user queries with an LLM model
- Use the RAG pipeline to get a movie recommendation
Step 1: Setting Up the Environment and Loading the Dataset
First, we need to import the necessary libraries and set up our environment. This also involves setting up our API keys and the connection string that the application uses to connect to a MongoDB database:
import warnings
warnings.filterwarnings('ignore')
import os
from dotenv import load_dotenv, find_dotenv
from datasets import load_dataset
import pandas as pd
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
from pymongo.mongo_client import MongoClient
import openai
import time
_ = load_dotenv(find_dotenv())
MONGO_URI = os.environ.get("MONGO_URI")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
openai.api_key = OPENAI_API_KEY
Next, we load our movie dataset:
dataset = load_dataset("Pablinho/movies-dataset", streaming=True, split="train")
dataset = dataset.take(200) # 200 movies for the sake of simplicity
dataset_df = pd.DataFrame(dataset)
The dataset contains more than 9000 entries. However, for this exercise, we’re limiting our dataset to 200 movies using dataset.take(200). In a real-world scenario, you’d likely use a much larger dataset.
Step 2: Modeling the Data with Pydantic
Data modeling is crucial for ensuring consistency and type safety in our application. Hence, we use Pydantic for this purpose:
class Movie(BaseModel):
Release_Date: Optional[str]
Title: str
Overview: str
Popularity: float
Vote_Count: int
Vote_Average: float
Original_Language: str
Genre: List[str]
Poster_Url: str
text_embeddings: List[float]
Using Pydantic provides several benefits, such as automatic data validation, type checking, and easy serialization/deserialization. Notice that we also created a text_embeddings field that will store our generated embeddings as a list of floats
Step 3: Embedding Generation
Now, we can use the OpenAI API and write a function for generating embeddings, as follows:
def get_embedding(text):
if not text or not isinstance(text, str):
return None
try:
embedding = openai.embeddings.create(
input=text,
model="text-embedding-3-small", dimensions=1536).data[0].embedding
return embedding
except Exception as e:
print(f"Error in get_embedding: {e}")
return None
In the previous code lines, we first check if the input is valid (non-empty string). Then, we use OpenAI’s embeddings.create method to generate the embedding using the “text-embedding-3-small” model, which generates 1536-dimensional embeddings.
Now, we can process each record and generate embeddings with the previous function. We also add some lines to process the ‘Genre’ field, converting it from a string (if it exists) to a list of genres.
def process_and_embed_record(record):
for key, value in record.items():
if pd.isnull(value):
record[key] = None
if record['Genre']:
record['Genre'] = record['Genre'].split(', ')
else:
record['Genre'] = []
text_to_embed = f"{record['Title']} {record['Overview']}"
embedding = get_embedding(text_to_embed)
record['text_embeddings'] = embedding
return record
records = [process_and_embed_record(record) for record in dataset_df.to_dict(orient='records')]
These embeddings will allow us to perform semantic searches later, finding movies that are conceptually similar to a given query. Notice that this process might take some time, especially for larger datasets, as we’re making an API call for each movie.
Step 4: Data Ingestion into MongoDB
We establish a connection to our MongoDB database:
def get_mongo_client(mongo_uri):
client = MongoClient(mongo_uri, appname="pmr.movie.python")
print("Connection to MongoDB successful")
return client
mongo_client = get_mongo_client(MONGO_URI)
database_name = "movies_dataset"
collection_name = "movies"
db = mongo_client.get_database(database_name)
collection = db.get_collection(collection_name)
collection.delete_many({})
We insert our processed and embedded data into MongoDB, which allows us to efficiently store and query our movie data, including the high-dimensional embeddings:
movies = [Movie(**record).dict() for record in records]
collection.insert_many(movies)
Step 5: Creating a Vector Search Index in MongoDB Atlas
Before we can perform vector search operations, we need to create a vector search index. This step can be done directly in the MongoDB Atlas platform:
- Log in to your MongoDB Atlas account
- Navigate to your cluster
- Go to the “Search & Vector Search” tab
- Click on “Create Search Index”
- Choose “JSON Editor” in the “Atlas Vector Search” section and use the following configuration:
{
"fields": [
{
"numDimensions": 1536,
"path": "text_embeddings",
"similarity": "cosine",
"type": "vector"
}
]
}
The idea is to create a vector search index named “vector_index_text” on the “text_embeddings” field. We use cosine similarity because it helps us find movies with similar themes or content by comparing the direction of their embedding vectors, ignoring differences in length or amount of detail, which is really good for matching a user’s query to movie descriptions.
Step 6: Implementing Vector Search
Now, we implement the vector search function. The following function is meant to perform a vector search in our MongoDB collection. It first generates an embedding for the user’s query. It then constructs a MongoDB aggregation pipeline using the $vectorSearch operator. The search looks for the 20 nearest neighbors among 150 candidates.
def vector_search(user_query, db, collection, vector_index="vector_index_text", max_retries=3):
query_embedding = get_embedding(user_query)
if query_embedding is None:
return "Invalid query or embedding generation failed."
vector_search_stage = {
"$vectorSearch": {
"index": vector_index,
"queryVector": query_embedding,
"path": "text_embeddings",
"numCandidates": 150,
"limit": 20
}
}
pipeline = [vector_search_stage]
for attempt in range(max_retries):
try:
results = list(collection.aggregate(pipeline))
if results:
explain_query_execution = db.command(
'explain', {
'aggregate': collection.name,
'pipeline': pipeline,
'cursor': {}
},
verbosity='executionStats')
vector_search_explain = explain_query_execution['stages'][0]['$vectorSearch']
millis_elapsed = vector_search_explain['explain']['collectStats']['millisElapsed']
print(f"Total time for the execution to complete on the database server: {millis_elapsed} milliseconds")
return results
else:
print(f"No results found on attempt {attempt + 1}. Retrying...")
time.sleep(2)
except Exception as e:
print(f"Error on attempt {attempt + 1}: {str(e)}")
time.sleep(2)
return "Failed to retrieve results after multiple attempts."
We implement a retry mechanism (up to 3 attempts) to handle potential transient issues. The function executes the explain command as well, which provides detailed information about the query execution.
Step 7: Handling User Queries with a LLM
Finally, we can handle user queries. First, we define a SearchResultItem class to structure our search results. Then, the handle_user_query function ties everything together: it performs a vector search based on the user’s query, formats the search results into a pandas DataFrame, and then uses OpenAI’s GPT model (i.e., gpt-3.5-turbo) to generate a response based on the search results and the user’s query, and displays the results and the generated response:
class SearchResultItem(BaseModel):
Title: str
Overview: str
Genre: List[str]
Vote_Average: float
Popularity: float
def handle_user_query(query, db, collection):
get_knowledge = vector_search(query, db, collection)
if isinstance(get_knowledge, str):
return get_knowledge, "No source information available."
search_results_models = [SearchResultItem(**result) for result in get_knowledge]
search_results_df = pd.DataFrame([item.dict() for item in search_results_models])
completion = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a movie recommendation system."},
{"role": "user", "content": f"Answer this user query: {query} with the following context:n{search_results_df}"}
]
)
system_response = completion.choices[0].message.content
print(f"- User Question:n{query}n")
print(f"- System Response:n{system_response}n")
return system_response
This function actually demonstrates the core value of this RAG: we generate a contextually appropriate response by retrieving relevant information from our database.
8. Using the RAG Pipeline
To use this RAG pipeline, you can now make queries like this:
query = """
I'm in the mood for a highly-rated action movie. Can you recommend something popular?
Include a reason for your recommendation.
"""
handle_user_query(query, db, collection)
The system would give a respond similar to this:
I recommend "Spider-Man: No Way Home" as a popular and highly-rated action
movie for you to watch. With a vote average of 8.3 and a popularity score
of 5083.954, this film has garnered a lot of attention and positive
reviews from audiences.
"Spider-Man: No Way Home" is a thrilling action-packed movie that brings
together multiple iterations of Spider-Man in an epic crossover event. It
offers a blend of intense action sequences, emotional depth, and nostalgic
moments that fans of the superhero genre will surely enjoy. So, if you're
in the mood for an exciting action movie with a compelling storyline and
fantastic visual effects, "Spider-Man: No Way Home" is an excellent choice
for your movie night.
Conclusion
Building a RAG pipeline involves several steps, from data loading and modeling to embedding generation and vector search. This example showcases how a RAG pipeline can provide informative, context-aware responses by combining the specific movie data in our database with the natural language understanding and generation capabilities of the language model. On top of this, we use MongoDB because it is well-suited for this type of workflow due to its native vector search capabilities, flexible document model, and scalability.
You can expand on this system by adding more data, fine-tuning your embeddings, or implementing more complex recommendation algorithms.
For the complete code and additional resources, check out the GitHub repository. The dataset used in this project is sourced from Kaggle and has been granted CC0 1.0 Universal (CC0 1.0) Public Domain Dedication by the original author. You can find the dataset and more information here.
Building a RAG Pipeline with MongoDB: Vector Search for Personalized Movie Picks was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.
Originally appeared here:
Building a RAG Pipeline with MongoDB: Vector Search for Personalized Movie Picks