Tag: AI

  • AI Knocking on the Classroom’s Door

    AI Knocking on the Classroom’s Door

    Dylan Seychell

    Reflections on How I Use AI in my University Lectures and Assessment

    I would like to start with a clear disclaimer: While I have been lecturing at a university level since 2011, I am not a formally trained educator. What I am sharing in this article are my personal views and learning methods that I developed with my students. These might not necessarily be the same views as those of the University of Malta, where I lecture.

    Generative AI requires educators to rethink assessment and embrace transparency for a student-centred, ethically informed learning future.

    Author x DALL-E 3

    This is how I structured my reflections in this article:

    • Why do I teach?
    • How did we get here?
    • Is AI a threat to Education?
    • What can Educators do About It?
    • What About Graded Work?
    • Way Forward?

    Why do I teach?

    I am an AI scientist, and teaching is a core part of my academic role, which I enjoy. In class, I believe my duty is to find the most effective ways to communicate my research area to my students. My students are my peers, and we are on a learning journey together in an area of study that is fast-evolving and heavily impacting humanity. Since 2015, I have also been leading Malta’s Google Developers Group (GDG), allowing me to share learning experiences with different professionals.

    In other words, my responsibility is two-fold: 1) Research the subject matter and 2) Facilitate my student’s learning experience by sharing my knowledge and pushing boundaries as a team.

    How did we get here?

    I was expecting the events of December 2022. At the end of May 2020, OpenAI released GPT3, demonstrating the capability of generating good-quality text with a few gentle prompts. It was the first time we could see an AI technique ‘writing something about itself,’ as this article reported. However, this was only accessible and ‘usable’ by those with a relatively strong technical background. In the summer of 2022, OpenAI released the beta version of its text-to-image generator DALL-E. Long story short: AI was getting better and more accessible month after month. I recently wrote another article about handling this fast rate of change.

    ChatGPT, Gemini, Claude and all the generative AI products that followed made this transformative technology accessible.

    Google Trends result for AI with a clear spike after the release of ChatGPT in December 2022

    Generative AI empowers us to explore countless topics and even achieve more. Do you need to draft an email complaint to your insurance company? Do you need to plan a marketing campaign? Do you need a SWOT analysis for your new project? Do you need a name for a pet? The list goes on. It’s there, quickly and freely accessible for anything we can think about.

    So why not help us understand something better? Why not help students learn in more efficient and effective ways? Why not help us rethink our delivery?

    Is AI a threat to education?

    Short answer: No. Long answer: it’s a bit more complicated than it sounds.

    EdTech or not?

    This article is not about the formal aspect of EdTech or e-Learning. These terms generally refer to software and hardware or their combination with educational theory to facilitate learning. There is well-researched work, such as a book by Matthew Montebello, a colleague of mine, titled “AI-Injected e-Learning”, which covers this area more extensively. In this article, I focus on off-the-shelf chat interfaces I use in class.

    Generative AI

    The term ‘Generative AI’ does not help. While it fits most use cases, it triggers a negative connotation in education. [This is the same for art, which deserves another article.] As educators, we do not wish/want our students to generate answers for the tasks we give them and hand them in for grading. That is why there is usually an adverse first reaction to AI in the learning process.

    But that’s only until we explore how this powerful technology can positively transform learning experiences—those of students and educators alike. From my experience, the more I use these methods in class, the more I broaden my expertise and knowledge in the subject area while finding better ways to communicate complex concepts.

    Is it plagiarism?

    I notice that many link the use of AI to plagiarism. The definition of plagiarism is “the unacknowledged use, as one’s own, of work of another person, whether or not such work has been published.” I do not wish to get into this debate, but I can comfortably say that I see a clear distinction between the use of AI and pre-AI plagiarism.

    Shall we detect it using software?

    Let’s be clear: It is wrong if a student or academic uses AI to generate content and submit it as if it were their work. It is academic misconduct, and it is helpful not to categorise it as plagiarism.

    We expect authenticity when we consume any work, whether casual like this article or a formal academic paper. The lack of authenticity is what bothers us when it comes to plagiarism and using AI to generate content.

    Generative AI works so well because (among other principles) it is based on a probabilistic approach. This means that every output differs from the one before, even if the same prompt is used to create it. If I copy someone else’s work and submit it as my own, what I copy is comparable and detected as plagiarism. Using Generative AI to create your work is a different form of misconduct. Because it is probabilistic, it follows detecting it is also probabilistic.

    For this reason, I am against using any tool that claims to detect AI-generated content in an educational setting. While it can indicate whether someone used AI to generate work, the lack of certainty makes me uncomfortable taking action on a student, which might impact their career, when I know the possibility of the detection being a false positive.

    Would you decide the fate of your students by flipping a coin? (Author x DALL-E 3)

    If still in doubt, I wish to share that this is also OpenAI’s perspective. In January 2023 (less than a month after releasing ChatGPT), they launched an AI text classifier which claimed to predict whether a body of text is or is not AI-generated. Guess what happened? They shut it down by July 2023 because it was “impossible to make this prediction”. So why should you jeopardize a student’s future on an unreliable probabilistic decision?

    What can educators do about it?

    One of the main challenges in using AI is that it can limit critical thinking if students rely on generated answers. There is also a reasonable concern that this technology will deepen educational inequities if access to it is uneven.

    Fight, Freeze or Flight? That would only mean we’re giving up on education because we have this new revolutionary technology. This is an opportunity and the start of a new era in education.

    My three guiding principles in handling this are the following:

    1. Transparency and Open Dialogue — We need these values in education, the workplace and society. So, let’s start by making it easy for our students to be transparent and open in how they use these tools in an educational setting where it should always be a safe space to try out new ideas.
    2. Focus on the Learning Process—Education has (nearly) always been about the final output, be it examinations, assignments, or dissertations. This is an opportunity to shift the focus towards helping students demonstrate their thought processes in understanding the material and finding solutions to real-life problems.
    3. Educate and Don’t Punish—We are there to educate and not punish, starting with a positive outlook. If we are transparent and provide students with a process that empowers them to be open about the way they use AI, we will have countless opportunities to give them constructive feedback about how they use this technology well without the need to punish them.

    AI is providing me with an opportunity to renew the way I teach. This is how I am making the best out of this opportunity:

    1. Personalised Learning — Ask students to prompt an AI system about a topic in various ways to help them better grasp the concept.
    2. Stimulate Class Discussions—Invite students to discuss the output they got from AI systems and moderate the discussion to open the way for new topics to be explored while also handling any misconceptions about the topic.
    3. Give students more realistic and real-life challenges — I use this to invite more mature students to apply topics covered in class to real-life scenarios or topics featured in current affairs.
    4. Helping those with learning difficulties — If you know that some of your students have learning difficulties, AI can help you create variations of your content to tailor the variations to help students with different abilities.
    5. Restructuring my courses — Once a course is over, I take feedback from students about what I can improve. I then combine the feedback with the course information and have ‘conversations’ with an AI system to help me reflect on fine-tuning the subsequent delivery, especially when introducing new topics.
    6. Improving my course material — I find AI systems helpful when rewriting specific topics and pitching the delivery to students with varying backgrounds. You can also use Generative AI to suggest different ways of delivering the same content or material.

    And what about graded work?

    This is the elephant in the room. It’s easy to favour using AI to have a better experience in class, but what about its use during graded assignments or work?

    I reflected a lot about this when this technology became very accessible. I decided to zoom out and look at the context holistically and from first principles. These are the principles upon which I base my decisions:

    1. AI has been evolving for 80 years and is here to stay
    2. These AI tools will only get better due to commercial interests in them
    3. My students today are tomorrow’s workforce, and employers will expect them to use AI to be more productive
    4. I want my students to be smart about using different tools and accountable to their leaders in how they use AI
    5. There are so many topics which I wish to cover in class, but it is (till now) difficult to do so.

    The way forward was/is clear: My duty is to design assessments that motivate my students, ensure their understanding, and prepare them to build a better tomorrow. The actionable way forward was to restructure my assessments to match this new reality and motivate students to use AI in an accountable manner.

    Type of Assessment

    The availability of AI challenged the nature of assessment. Some questions and tasks can be inputted as prompts, and students can get an outstanding output that they can submit without giving it much thought. While such behaviour is not right and shouldn’t happen, it also says a lot about the assessments we give students.

    Consider the case of an essay about a concept, such as freedom of speech, where students would be expected to write 2000 words about the subject. The process would be that the students leave class and write the essay, submit it for correction, and the lecturer corrects it and gives a grade. This approach is an open invitation to the dilemmas I mentioned above.

    Generative AI Journal

    What do we look for in an employee? I believe that accountability tops the list because anything else follows. Based on this reasoning, I decided to develop the idea of a Generative AI Journal for my students to complete when they’re working on assignments I give. This journal explains how they used generative AI and how it went. In return, they get marks (in the region of 10%).

    The key reasoning is the following. I am fine with my employees using AI to be more productive, but I need to know how they use it. This will help me be a better mentor and evaluate the task. Students are tomorrow’s employees. They will work in an environment where AI is available and ready for use. They will be expected to be as productive as possible. The aggregate of all these thoughts is that generative AI should be used accountable and transparently. Based on this reasoning, I came up with the idea of this journal. This 10-page journal is structured accordingly:

    1. Introduction: Briefly describe the generative AI models (ChatGPT, Gemini, VS Co-Pilot, etc.) used and the rationale behind that choice. (Maximum of 1 page)
    2. Ethical Considerations: Discuss the ethical aspects of using generative AI in the project. This should include issues like data bias and privacy together with measures of good academic conduct (Maximum of 1 page)
    3. Methodology: Outline the methods and steps to integrate the generative AI model into the work. How did Generative AI fit into the assignment workflow?
    4. Prompts and Responses: List the specific prompts used with the generative AI model that contributed to levelling up the work. Include the generated response for each prompt and explain how it improved your project.
    5. Improvements and Contributions: Discuss the specific areas where generative AI enhanced the deliverables. This can include, but is not limited to, data analysis, formulation of ethical considerations, enhancement of literature reviews, or idea generation.
    6. Individual Reflection: Reflection on the personal experience using generative AI in the project. Discuss what was learned, what surprised you, and how your perspective on using AI in academic projects has changed, if at all.
    7. References and List of Resources Used

    This is not something definitive. It is a work in progress that I am constantly updating to reflect the needs of my students and the context of the Generative AI evolution.

    Way forward?

    This is not a journey with a destination. The probable scenario is that education (and the workplace) will have to evolve along with the evolution of Generative AI.

    At the start of this article, I shared my perspective of seeing my students as my peers in the research journey. I’m linking this perspective to the way forward. At the end of the first semester of the academic year 2023/24, I distributed an anonymous questionnaire to 60 of my students. In this article, I am sharing the responses to two questions where I asked them what they thought about the way forward after using Generative AI, as described above.

    When asked how they envision the future of Generative AI in education, 56% said that it will play a significant role but not central, with 12% saying it will only have a moderate role. 30% said that it will be a crucial part of education. This is meaningful because it shows that students feel and see the value in using this technology in class and that it should augment human educators rather than be a replacement.

    I asked my students for advice on whether I should keep using Generative AI in class. 92% think I should…and I will. The 8% were unsure, and that is also meaningful. Above all, this is a work in progress, and more work should be done to deliver meaningful educational experiences…but this doesn’t mean we shouldn’t start trying today.

    This technology will improve every week and is here to stay. At the time of publishing this article, OpenAI released the GPT-4o Model and Google released countless AI products and features at the Google IO. We’re still scratching the surface of possibilities.

    The future is not about fearing AI in the classroom. It is about using it as humanity’s latest invention to handle knowledge and information, hence empowering all stakeholders in the educational system.

    Dr Dylan Seychell is a resident academic at the University of Malta’s Department of Artificial Intelligence, specialising in Computer Vision and Applied Machine Learning. With a background in academia and industry, he holds a PhD in Computer Vision and has published extensively on AI in international peer-reviewed conferences, journals and books.

    Outside of the academic setting, he actively applies his expertise through his enterprise, focusing on enhancing tourist experiences through technological and cultural heritage innovations. He leads Malta’s Google Developers Group (GDG), is a technical expert certified by the Malta Digital Innovation Authority and is a Tourism Operators Business Section committee member within The Malta Chamber.


    AI Knocking on the Classroom’s Door was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    AI Knocking on the Classroom’s Door

    Go Here to Read this Fast! AI Knocking on the Classroom’s Door

  • Using LLMs to Learn From YouTube

    Alok Suresh

    Image created by author using Midjourney

    A conversational question-answering tool built using LangChain, Pinecone, Flask, React and AWS

    Introduction

    Have you ever encountered a podcast or a video you wanted to watch, but struggled to find the time due to its length? Have you wished for an easy way to refer back to specific sections of content in this form ?

    This is an issue I’ve faced many times when it comes to YouTube videos of popular podcasts like The Diary of a CEO. Indeed, a lot of the information covered in podcasts such as this is readily available through a quick Google search. But listening to an author’s take on something they are passionate about, or hearing about a successful entrepreneur’s experience from their perspective tends to provide much more insight and clarity.

    Motivated by this problem and a desire to educate myself on LLM-powered applications and their development, I decided to build a chatbot which allows users to ask questions about the content of YouTube videos using the RAG (Retrieval Augmented Generation) framework. In the rest of this post, I’ll talk through my experience developing this application using LangChain, Pinecone, Flask, and React, and deploying it with AWS:

    I’ve limited code snippets to those that I think will be most useful. For anyone interested, the complete codebase for the application can be found here.

    Backend

    We’ll be using the transcripts of YouTube videos as the source from which the LLM generates answers to user-defined questions. To facilitate this, the backend will need a method of retrieving and appropriately storing these for use in real time, as well as one for using them to generate answers. We also want a way of storing chat histories so that users can refer back to them at a later time. Let’s see how the backend can be developed to satisfy all of these requirements now.

    Response generation

    Since this is a conversational question-answering tool, the application must be able to generate answers to questions while taking both the relevant context and chat history into account. This can be achieved using Retrieval Augmented Generation with Conversation Memory, as illustrated below:

    For clarity, the steps involved are as follows:

    1. Question summarisation: The current question and the chat history are condensed into a standalone question using an appropriate prompt asking the LLM to do so.
    2. Semantic search: Next, the YouTube transcript chunks that are most relevant to this condensed question must be retrieved. The transcripts themselves are stored as embeddings, which are numerical representations of words and phrases, learned by an embedding model that captures their content and semantics. During the semantic search, the components of each transcript whose embeddings are most similar to those of the condensed question are retrieved.
    3. Context-aware generation: These retrieved transcript chunks are then used as the context within another prompt to the LLM asking it to answer the condensed question. Using the condensed question ensures that the generated answer is relevant to the current question as well as previous questions asked by the user during the chat.

    Data Pipeline

    Before moving on to the implementation of the process outlined above, let’s take a step back and focus on the YouTube video transcripts themselves. As discussed, they must be stored as embeddings to efficiently search for and retrieve them during the semantic search phase of the RAG process. Let’s go through the source, method of retrieval and method of storage for these now.

    1. Source: YouTube provides access to metadata like video IDs, as well as autogenerated transcripts through its Data API. To begin with, I’ve selected this playlist from The Diary of a CEO podcast, in which various money experts and entrepreneurs discuss personal finance, investing, and building successful businesses.
    2. Retrieval: I make use of one class responsible for retrieving metadata on YouTube videos like Video IDs by interacting directly with the YouTube Data API, and another which uses the youtube-transcript-API Python package to retrieve the video transcripts. These transcripts are then stored as JSON files in an S3 bucket in their raw form.
    3. Storage: Next, the transcripts need to be converted to embeddings and stored in a vector database. However, a pre-requisite to this step is splitting them into chunks so that upon retrieval, we get segments of text that are of the highest relevance to each question while also minimising the length of the LLM prompt itself. To satisfy this requirement I define a custom S3JsonFileLoader class here (due to some issues with LangChain’s out-of-the-box version), and make use of the text splitter object to split the transcripts at load time. I then make use of LangChain’s interface to the Pinecone Vectorstore (my vector store of choice for efficient storage, search and retrieval of the transcript embeddings) to store the transcript chunks as embeddings expected by OpenAI’s gpt-3.5-turbo model:
    import os

    import pinecone
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.vectorstores import Pinecone
    from langchain.chat_models import ChatOpenAI
    from langchain.embeddings.openai import OpenAIEmbeddings

    from chatytt.embeddings.s3_json_document_loader import S3JsonFileLoader

    # Define the splitter with which to split the transcripts into chunks
    text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=pre_processing_conf["recursive_character_splitting"]["chunk_size"],
    chunk_overlap=pre_processing_conf["recursive_character_splitting"][
    "chunk_overlap"
    ],
    )

    # Load and split the transcript
    loader = S3JsonFileLoader(
    bucket="s3_bucket_name",
    key="transcript_file_name",
    text_splitter=text_splitter,
    )
    transcript_chunks = loader.load(split_doc=True)

    # Connect to the relevant Pinecone vectorstore
    pinecone.init(
    api_key=os.environ.get("PINECONE_API_KEY"), environment="gcp-starter"
    )
    pinecone_index = pinecone.Index(os.environ.get("INDEX_NAME"), pool_threads=4)

    # Store the transcrpt chunks as embeddings in Pinecone
    vector_store = Pinecone(
    index=pinecone_index,
    embedding=OpenAIEmbeddings(),
    text_key="text",
    )
    vector_store.add_documents(documents=transcript_chunks)

    We can also make use of a few AWS services to automate these steps using a workflow configured to run periodically. I do this by implementing each of the three steps mentioned above in separate AWS Lamba Functions (a form of serverless computing, which provisions and utilises resources as needed at runtime), and defining the order of their execution using AWS Step Functions (a serverless orchestration tool). This workflow is then executed by an Amazon EventBridge schedule which I’ve set to run once a week so that any new videos added to the playlist are retrieved and processed automatically:

    Note that I have obtained permission from the The Diary of a CEO channel, to use the transcripts of videos from the playlist mentioned above. Anyone wishing to use third party content in this way, should first obtain permission from the original owners.

    Implementing RAG

    Now that the transcripts for our playlist of choice are periodically being retrieved, converted to embeddings and stored, we can move on to the implementation of the core backend functionality for the application i.e. the process of generating answers to user-defined questions using RAG. Luckily, LangChain has a ConversationalRetrievalChain that does exactly that out of the box! All that’s required is to pass in the query, chat history, a vector store object that can be used to retrieve transcripts chunks, and an LLM of choice into this chain like so:

    import pinecone
    from langchain.vectorstores import Pinecone
    from langchain.chat_models import ChatOpenAI
    from langchain.embeddings.openai import OpenAIEmbeddings

    # Define the vector store with which to perform a semantic search against
    # the Pinecone vector database
    pinecone.init(
    api_key=os.environ.get("PINECONE_API_KEY"), environment="gcp-starter"
    )
    pinecone_index = pinecone.Index(os.environ.get("INDEX_NAME"), pool_threads=4)
    vector_store = Pinecone(
    index=pinecone_index,
    embedding=OpenAIEmbeddings(),
    text_key="text",
    )

    # Define the retrieval chain that will perform the steps in RAG
    # with conversation memory as outlined above.
    chain = ConversationalRetrievalChain.from_llm(
    llm=ChatOpenAI(), retriever=vector_store.as_retriever()
    )

    # Call the chain passing in the current question and the chat history
    response = chain({"question": query, "chat_history": chat_history})["answer"]

    I’ve also implemented the functionality of this chain from scratch, as described in LangChain’s tutorials, using both LangChain Expression Language here and SequentialChain here . These may provide more insight into all of the actions taking place under the hood in the chain used above.

    Saving Chat History

    The backend can generate answers to questions now, but it would also be nice to store and retrieve chat history so that users can refer to old chats as well. Since this is a known access pattern of the same item for different users I decided to use DynamoDB, a NoSQL database known for its speed and cost efficiency in handling unstructured data of this form. In addition, the boto3 SDK simplifies interaction with the database, requiring just a few functions for storing and retrieving data:

    import os
    import time
    from typing import List, Any

    import boto3

    table = boto3.resource("dynamodb").Table(os.environ.get("CHAT_HISTORY_TABLE_NAME")

    def fetch_chat_history(user_id: str) -> str:
    response = table.get_item(Key={"UserId": user_id})
    return response["Item"]


    def update_chat_history(user_id: str, chat_history: List[dict[str, Any]]):
    chat_history_update_data = {
    "UpdatedTimestamp": {"Value": int(time.time()), "Action": "PUT"},
    "ChatHistory": {"Value": chat_history, "Action": "PUT"},
    }
    table.update_item(
    Key={"UserId": user_id}, AttributeUpdates=chat_history_update_data
    )


    def is_new_user(user_id: str) -> bool:
    response = table.get_item(Key={"UserId": user_id})
    return response.get("Item") is None


    def create_chat_history(user_id: str, chat_history: List[dict[str, Any]]):
    item = {
    "UserId": user_id,
    "CreatedTimestamp": int(time.time()),
    "UpdatedTimestamp": None,
    "ChatHistory": chat_history,
    }
    table.put_item(Item=item)

    Exposing Logic via an API

    We have now covered all of the core functionality, but the client side of the app with which the user interacts will need some way of triggering and making use of these processes. To facilitate this, each of the three pieces of logic (generating answers, saving chat history, and retrieving chat history) are exposed through separate endpoints within a Flask API, which will be called by the front end:

    from dotenv import load_dotenv
    from flask import Flask, request, jsonify
    from flask_cors import CORS

    from chatytt.chains.standard import ConversationalQAChain
    from chatytt.vector_store.pinecone_db import PineconeDB
    from server.utils.chat import parse_chat_history
    from server.utils.dynamodb import (
    is_new_user,
    fetch_chat_history,
    create_chat_history,
    update_chat_history,
    )

    load_dotenv()
    app = Flask(__name__)

    # Enable Cross Origin Resource Sharing since the server and client
    # will be hosted seperately
    CORS(app)

    pinecone_db = PineconeDB(index_name="youtube-transcripts", embedding_source="open-ai")
    chain = ConversationalQAChain(vector_store=pinecone_db.vector_store)

    @app.route("/get-query-response/", methods=["POST"])
    def get_query_response():
    data = request.get_json()
    query = data["query"]

    raw_chat_history = data["chatHistory"]
    chat_history = parse_chat_history(raw_chat_history)
    response = chain.get_response(query=query, chat_history=chat_history)

    return jsonify({"response": response})


    @app.route("/get-chat-history/", methods=["GET"])
    def get_chat_history():
    user_id = request.args.get("userId")

    if is_new_user(user_id):
    response = {"chatHistory": []}
    return jsonify({"response": response})

    response = {"chatHistory": fetch_chat_history(user_id=user_id)["ChatHistory"]}

    return jsonify({"response": response})


    @app.route("/save-chat-history/", methods=["PUT"])
    def save_chat_history():
    data = request.get_json()
    user_id = data["userId"]

    if is_new_user(user_id):
    create_chat_history(user_id=user_id, chat_history=data["chatHistory"])
    else:
    update_chat_history(user_id=user_id, chat_history=data["chatHistory"])

    return jsonify({"response": "chat saved"})


    if __name__ == "__main__":
    app.run(debug=True, port=8080)

    Lastly, I use AWS Lambda to wrap the three endpoints in a single function that is then triggered by an API Gateway resource, which routes requests to the correct endpoint by constructing an appropriate payload for each as needed. The flow of this setup now looks as follows:

    Frontend

    With the backend for the app complete, I’ll briefly cover the implementation of the user interface in React, giving special attention to the interactions with the server component housing the API above.

    I make use of dedicated functional components for each section of the app, covering all of the typical requirements one might expect in a chatbot application:

    • A container for user inputs with a send chat button.
    • A chat feed in which user inputs and answers are displayed.
    • A sidebar containing chat history, a new chat button and a save chat button.

    The interaction between these components and the flow of data is illustrated below:

    The API calls to each of the three endpoints and the subsequent change of state of the relevant variables on the client side, are defined in separate functional components:

    1. The logic for retrieving the generated answer to each question:
    import React from "react";
    import {chatItem} from "./LiveChatFeed";

    interface Props {
    setCurrentChat: React.SetStateAction<any>
    userInput: string
    currentChat: Array<chatItem>
    setUserInput: React.SetStateAction<any>
    }

    function getCurrentChat({setCurrentChat, userInput, currentChat, setUserInput}: Props){
    // The current chat is displayed in the live chat feed. Since we don't want
    // to wait for the LLM response before displaying the users question in
    // the chat feed, copy it to a seperate variable and pass it to the current
    // chat before pining the API for the answer.
    const userInputText = userInput
    setUserInput("")
    setCurrentChat([
    ...currentChat,
    {
    "text": userInputText,
    isBot: false
    }
    ])

    // Create API payload for the post request
    const options = {
    method: 'POST',
    headers: {
    "Content-Type": 'application/json',
    'Accept': 'application/json'
    },
    body: JSON.stringify({
    query: userInputText,
    chatHistory: currentChat
    })
    }

    // Ping the endpoint, wait for the response, and add it to the current chat
    // so that it appears in the live chat feed.
    fetch(`${import.meta.env.VITE_ENDPOINT}get-query-response/`, options).then(
    (response) => response.json()
    ).then(
    (data) => {
    setCurrentChat([
    ...currentChat,
    {
    "text": userInputText,
    "isBot": false
    },
    {
    "text": data.response,
    "isBot": true
    }
    ])
    }
    )
    }

    export default getCurrentChat

    2. Saving chat history when the user clicks the save chat button:

    import React, {useState} from "react";
    import {chatItem} from "./LiveChatFeed";
    import saveIcon from "../assets/saveicon.png"
    import tickIcon from "../assets/tickicon.png"

    interface Props {
    userId: String
    previousChats: Array<Array<chatItem>>
    }

    function SaveChatHistoryButton({userId, previousChats}: Props){
    // Define a state to determine if the current chat has been saved or not.
    const [isChatSaved, setIsChatSaved] = useState(false)

    // Construct the payload for the PUT request to save chat history.
    const saveChatHistory = () => {
    const options = {
    method: 'PUT',
    headers: {
    "Content-Type": 'application/json',
    'Accept': 'application/json'
    },
    body: JSON.stringify({
    "userId": userId,
    "chatHistory": previousChats
    })
    }

    // Ping the API with the chat history, and set the state of
    // isChatSaved to true if successful
    fetch(`${import.meta.env.VITE_ENDPOINT}save-chat-history/`, options).then(
    (response) => response.json()
    ).then(
    (data) => {
    setIsChatSaved(true)
    }
    )
    }

    // Display text on the save chat button dynamically depending on the value
    // of the isChatSaved state.
    return (
    <button
    className="save-chat-history-button"
    onClick={() => {saveChatHistory()}}
    > <img className={isChatSaved?"tick-icon-img":"save-icon-img"} src={isChatSaved?tickIcon:saveIcon}/>
    {isChatSaved?"Chats Saved":"Save Chat History"}
    </button>
    )
    }

    export default SaveChatHistoryButton

    3. Retrieving chat history when the app first loads up:

    import React from "react";
    import {chatItem} from "./LiveChatFeed";

    interface Props {
    userId: String
    previousChats: Array<Array<chatItem>>
    setPreviousChats: React.SetStateAction<any>
    }

    function getUserChatHistory({userId, previousChats, setPreviousChats}: Props){
    // Create the payload for the GET request
    const options = {
    method: 'GET',
    headers: {
    "Content-Type": 'application/json',
    'Accept': 'application/json'
    }
    }

    // Since this is a GET request pass in the user id as a query parameter
    // Set the previousChats state to the chat history returned by the API.
    fetch(`${import.meta.env.VITE_ENDPOINT}get-chat-history/?userId=${userId}`, options).then(
    (response) => response.json()
    ).then(
    (data) => {
    if (data.response.chatHistory.length > 0) {
    setPreviousChats(
    [
    ...previousChats,
    ...data.response.chatHistory
    ]
    )
    }
    }
    )
    }

    export default getUserChatHistory

    For the UI itself, I chose something very similar to ChatGPT’s own interface housing a central chat feed component, and a sidebar containing supporting content like chat histories. Some quality-of-life features for the user include automatic scrolling to the most recently created chat item, and previous chats loading upon sign-in (I have not included these in the article, but you can find their implementation in the relevant functional component here). The final UI appears as shown below:

    Now that we have a fully functional UI, all that’s left is hosting it for use online which I’ve chosen to do with AWS Amplify. Among other things, Amplify is a fully managed web hosting service that handles resource provisioning and hosting of web applications. User authentication for the app is managed by Amazon Cognito allowing user sign-up and sign-on, alongside handling credential storage and management:

    Comparison to ChatGPT responses

    Now that we’ve discussed the process of building the app, let’s have a deep dive into the responses generated for some questions, and compare these to the same question posed to ChatGPT*.

    Note that this type of comparison is inherently an “unfair” one since the underlying prompts to the LLM used in our application will contain additional context (in the form of relevant transcript chunks) retrieved from the semantic search step. However, it will allow us to qualitatively assess just how much of a difference the prompts created using RAG make, to the responses generated by the same underlying LLM.

    *All ChatGPT responses are from gpt-3.5, since this was the model used in the application.

    Example 1:

    You want to learn about the contents in this video where Steven Bartlett chats to Morgan Housel, a financial writer and investor. Based on the title of the video, it looks like he’s against buying a house — but suppose you don’t have time to watch the whole thing to find out why. Here is a snippet of the conversation I had with the application asking about it:

    You can also see the conversation memory in action here, where in follow-up questions I make no mention of Morgan Housel explicitly or even the words “house” or “buying”. Since the summarised query takes previous chat history into account, the response from the LLM reflects previous questions and their answers. The portion of the video in which Housel mentions the points above can be found roughly an hour and a half into the podcast — around the 1:33:00–1:41:00 timestamp.

    I asked ChatGPT the same thing, and as expected got a very generic answer that is non-specific to Housel’s opinion.

    It’s arguable that since the video came out after the model’s last “knowledge update” the comparison is flawed, but Housel’s opinions are also well documented in his book ‘The Psychology of Money’ which was published in 2020. Regardless, the reliance on these knowledge updates further highlights the benefits of context-aware answer generation over standalone models.

    Example 2

    Below are some snippets from a chat about this discussion with Alex Hormozi, a monetization and acquisitions expert. From the title of the video, it looks like he knows a thing or two about successfully scaling businesses so I ask for more details on this:

    This seems like a reasonable answer, but let’s see if we can extract any more information from the same line of questioning.

    Notice the level of detail the LLM is able to extract from the YouTube transcripts. All of the above can be found over a 15–20 minute portion of the video around the 17:00–35:00 timestamp.

    Again, the same question posed to ChatGPT returns a generic answer about the entrepreneur but lacks the detail made available through the context within the video transcripts.

    Deployment

    The final thing we’ll discuss is the process of deploying each of the components on AWS. The data pipeline, backend, and frontend are each contained within their own CloudFormation stacks (collections of AWS resources). Allowing these to be deployed in isolation like this, ensures that the entire app is not redeployed unnecessarily during development. I make use of AWS SAM (Serverless Application Model) to deploy the infrastructure for each component as code, leveraging the SAM template specification and CLI:

    • The SAM template specification — A short-hand syntax, that serves as an extension to AWS CloudFormation, for defining and configuring collections of AWS resources, how they should interact, and any required permissions.
    • The SAM CLI — A command line tool used, among other things, for building and deploying resources as defined in a SAM template. It handles the packaging of application code and dependencies, converting the SAM template to CloudFormation syntax and deploying templates as individual stacks on CloudFormation.

    Rather than including the complete templates (resource definitions) of each component, I will highlight specific areas of interest for each service we’ve discussed throughout the post.

    Passing sensitive environment variables to AWS resources:

    External components like the Youtube Data API, OpenAI API and Pinecone API are relied upon heavily throughout the application. Although it is possible to hardcode these values into the CloudFormation templates and pass them around as ‘parameters’, a safer method is to create secrets for each in AWS SecretsManager and reference these secrets in the template like so:

    Parameters:
    YoutubeDataAPIKey:
    Type: String
    Default: '{{resolve:secretsmanager:youtube-data-api-key:SecretString:youtube-data-api-key}}'
    PineconeAPIKey:
    Type: String
    Default: '{{resolve:secretsmanager:pinecone-api-key:SecretString:pinecone-api-key}}'
    OpenaiAPIKey:
    Type: String
    Default: '{{resolve:secretsmanager:openai-api-key:SecretString:openai-api-key}}'

    Defining a Lambda Function:

    These units of serverless code form the backbone of the data pipeline and serve as an entry point to the backend for the web application. To deploy these using SAM, it’s as simple as defining the path to the code that the function should run when invoked, alongside any required permissions and environment variables. Here is an example of one of the functions used in the data pipeline:

    FetchLatestVideoIDsFunction:
    Type: AWS::Serverless::Function
    Properties:
    CodeUri: ../code_uri/.
    Handler: chatytt.youtube_data.lambda_handlers.fetch_latest_video_ids.lambda_handler
    Policies:
    - AmazonS3FullAccess
    Environment:
    Variables:
    PLAYLIST_NAME:
    Ref: PlaylistName
    YOUTUBE_DATA_API_KEY:
    Ref: YoutubeDataAPIKey

    Retrieving the definition of the data pipeline in Amazon States Language:

    In order to use Step Functions as an orchestrator for the individual Lambda functions in the data pipeline, we need to define the order in which each should be executed as well as configurations like max retry attempts in Amazon States Language. An easy way to do this is by using the Workflow Studio in the Step Functions console to diagrammatically create the workflow, and then take the autogenerated ASL definition of the workflow as a starting point that can be altered appropriately. This can then be linked in the CloudFormation template rather than being defined in place:

    EmbeddingRetrieverStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
    DefinitionUri: statemachine/embedding_retriever.asl.json
    DefinitionSubstitutions:
    FetchLatestVideoIDsFunctionArn: !GetAtt FetchLatestVideoIDsFunction.Arn
    FetchLatestVideoTranscriptsArn: !GetAtt FetchLatestVideoTranscripts.Arn
    FetchLatestTranscriptEmbeddingsArn: !GetAtt FetchLatestTranscriptEmbeddings.Arn
    Events:
    WeeklySchedule:
    Type: Schedule
    Properties:
    Description: Schedule to run the workflow once per week on a Monday.
    Enabled: true
    Schedule: cron(0 3 ? * 1 *)
    Policies:
    - LambdaInvokePolicy:
    FunctionName: !Ref FetchLatestVideoIDsFunction
    - LambdaInvokePolicy:
    FunctionName: !Ref FetchLatestVideoTranscripts
    - LambdaInvokePolicy:
    FunctionName: !Ref FetchLatestTranscriptEmbeddings

    See here for the ASL definition used for the data pipeline discussed in this post.

    Defining the API resource:

    Since the API for the web app will be hosted separately from the front-end, we must enable CORS (cross-origin resource sharing) support when defining the API resource:

    ChatYTTApi:
    Type: AWS::Serverless::Api
    Properties:
    StageName: Prod
    Cors:
    AllowMethods: "'*'"
    AllowHeaders: "'*'"
    AllowOrigin: "'*'"

    This will allow the two resources to communicate freely with each other. The various endpoints made accessible through a Lambda function can be defined like so:

    ChatResponseFunction:
    Type: AWS::Serverless::Function
    Properties:
    Runtime: python3.9
    Timeout: 120
    CodeUri: ../code_uri/.
    Handler: server.lambda_handler.lambda_handler
    Policies:
    - AmazonDynamoDBFullAccess
    MemorySize: 512
    Architectures:
    - x86_64
    Environment:
    Variables:
    PINECONE_API_KEY:
    Ref: PineconeAPIKey
    OPENAI_API_KEY:
    Ref: OpenaiAPIKey
    Events:
    GetQueryResponse:
    Type: Api
    Properties:
    RestApiId: !Ref ChatYTTApi
    Path: /get-query-response/
    Method: post
    GetChatHistory:
    Type: Api
    Properties:
    RestApiId: !Ref ChatYTTApi
    Path: /get-chat-history/
    Method: get
    UpdateChatHistory:
    Type: Api
    Properties:
    RestApiId: !Ref ChatYTTApi
    Path: /save-chat-history/
    Method: put

    Defining the React app resource:

    AWS Amplify can build and deploy applications using a reference to the relevant Github repository and an appropriate access token:

    AmplifyApp:
    Type: AWS::Amplify::App
    Properties:
    Name: amplify-chatytt-client
    Repository: <https://github.com/suresha97/ChatYTT>
    AccessToken: '{{resolve:secretsmanager:github-token:SecretString:github-token}}'
    IAMServiceRole: !GetAtt AmplifyRole.Arn
    EnvironmentVariables:
    - Name: ENDPOINT
    Value: !ImportValue 'chatytt-api-ChatYTTAPIURL'

    Once the repository itself is accessible, Ampify will look for a configuration file with instructions on how to build and deploy the app:

    version: 1
    frontend:
    phases:
    preBuild:
    commands:
    - cd client
    - npm ci
    build:
    commands:
    - echo "VITE_ENDPOINT=$ENDPOINT" >> .env
    - npm run build
    artifacts:
    baseDirectory: ./client/dist
    files:
    - "**/*"
    cache:
    paths:
    - node_modules/**/*

    As a bonus, it is also possible to automate the process of continuous deployment by defining a branch resource that will be monitored and used to re-deploy the app automatically upon further commits:

    AmplifyBranch:
    Type: AWS::Amplify::Branch
    Properties:
    BranchName: main
    AppId: !GetAtt AmplifyApp.AppId
    EnableAutoBuild: true

    With deployment finalised in this way, it is accessible to anyone with the link made available from the AWS Amplify console. A recorded demo of the app being accessed like this can be found here:

    Conclusion

    At a high level, we have covered the steps behind:

    • Building a data pipeline for the collection and storage of content as embeddings.
    • Developing a backend server component which performs Retrieval Augmented Generation with Conversation Memory.
    • Designing a user interface for surfacing generated answers and chat histories.
    • How these components can be connected and deployed to create a solution that provides value and saves time.

    We’ve seen how an application like this can be used to streamline and in some ways ‘optimise’ the consumption of content such as YouTube videos for learning and development purposes. But these methods can just as easily be applied in the workplace for internal use or for augmenting customer-facing solutions. This is why the popularity of LLMs, and the RAG technique in particular has garnered so much attention in many organisations.

    I hope this article has provided some insight into how these relatively new techniques can be utilised alongside more traditional tools and frameworks for developing user-facing applications.

    Acknowledgements

    I would like to thank The Diary of a CEO team, for their permission to use the transcripts of videos from this playlist in this project, and in the writing of this article.

    All images, unless otherwise noted, are by the author.


    Using LLMs to Learn From YouTube was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    Using LLMs to Learn From YouTube

    Go Here to Read this Fast! Using LLMs to Learn From YouTube

  • Create a multimodal assistant with advanced RAG and Amazon Bedrock

    Create a multimodal assistant with advanced RAG and Amazon Bedrock

    Alfred Shen

    In this post, we present a new approach named multimodal RAG (mmRAG) to tackle those existing limitations in greater detail. The solution intends to address these limitations for practical generative artificial intelligence (AI) assistant use cases. Additionally, we examine potential solutions to enhance the capabilities of large language models (LLMs) and visual language models (VLMs) with advanced LangChain capabilities, enabling them to generate more comprehensive, coherent, and accurate outputs while effectively handling multimodal data

    Originally appeared here:
    Create a multimodal assistant with advanced RAG and Amazon Bedrock

    Go Here to Read this Fast! Create a multimodal assistant with advanced RAG and Amazon Bedrock

  • How 20 Minutes empowers journalists and boosts audience engagement with generative AI on Amazon Bedrock

    How 20 Minutes empowers journalists and boosts audience engagement with generative AI on Amazon Bedrock

    Aurélien Capdecomme

    This post is co-written with Aurélien Capdecomme and Bertrand d’Aure from 20 Minutes. With 19 million monthly readers, 20 Minutes is a major player in the French media landscape. The media organization delivers useful, relevant, and accessible information to an audience that consists primarily of young and active urban readers. Every month, nearly 8.3 million 25–49-year-olds choose […]

    Originally appeared here:
    How 20 Minutes empowers journalists and boosts audience engagement with generative AI on Amazon Bedrock

    Go Here to Read this Fast! How 20 Minutes empowers journalists and boosts audience engagement with generative AI on Amazon Bedrock

  • Reinforcement Learning for feature selection

    Reinforcement Learning for feature selection

    Baptiste Lefort

    Revolutionizing Large Dataset Feature Selection with Reinforcement Learning

    Leverage the power of reinforcement learning for feature selection when faced with very large datasets

    Discover how reinforcement learning transforms feature selection for machine learning models. Learn the process, implementation, and benefits of this innovative approach with practical examples and a dedicated Python library.

    Photo from Jared Murray on Unplash

    Feature selection is a determining step in the process of building a machine learning model. Selecting the good features for the model and the task that we want to achieve can definitely improve the performances. Indeed, a feature can add some noise and then disturb the model.

    Also, selecting the features is especially more important if we are dealing with high-dimensional data set. It enables the model to learn faster and better. The idea is then to find the optimal number of features and the most meaningful ones.

    In this article I will tackle this problem and go beyond by introducing a newly implemented method for feature selection. Although it exists many different feature selection processes they will not be introduced here since a lot of articles are already dealing with them. I will focus on the feature selection using the reinforcement learning strategy.

    First, the reinforcement learning and more especially the Markov Decision Process will be addressed. It is a very new approach in the data science domain and more especially for feature selection purpose. After, I will introduce an implementation of this and how to install and use the python library (FSRLearning). Finally, I will prove the efficiency of this implementation. Among the possible feature selection approaches like wrappers or filtering, the reinforcement learning is the most powerful and efficient.

    The goal of this article is to emphasise on the implementation for concret and real-problem oriented utilisation. The theoretical aspect of this problem will be simplified with examples although some references will be available at the end.

    Reinforcement Learning : The Markov Decision Problem for feature selection

    It has been demonstrated that reinforcement learning (RL) technics can be very efficient for problems like game solving. The concept of RL is based on Markovian Decision Process (MDP). The point here is not to define deeply the MDP but to get the general idea of how it works and how it can be useful to our problem.

    The naive idea behind RL is that an agent starts in an unknown environnement. This agent has to take actions to complete a task. In function of the current state of the agent and the action he has selected previously, the agent will be more inclined to choose some actions. At every new state reached and action taken, the agent receives a reward. Here are then the main parameters that we need to define for feature selection purpose:

    • What is a state ?
    • What is an action ?
    • What are the rewards ?
    • How do we choose an action ?

    Firstly, the state is merely a subset of features that exist in the data set. For example, if the data set has three features (Age, Gender, Height) plus one label, here will be the possible states:

    []                                              --> Empty set                           
    [Age], [Gender], [Height] --> 1-feature set
    [Age, Gender], [Gender, Height], [Age, Height] --> 2-feature set
    [Age, Gender, Height] --> All-feature set

    In a state, the order of the features does not matter and it will be explained why a little bit later in the article. We have to consider it as a set and not a list of features.

    Concerning the actions, from a subset we can go to any other subset with one not-already explored feature more than the current state. In the feature selection problem, an action is then selecting a not-already explored feature in the current state and add it to the next state. Here is a sample of possible actions:

    [Age] -> [Age, Gender]
    [Gender, Height] -> [Age, Gender, Height]

    Here is an example of impossible actions:

    [Age] -> [Age, Gender, Height]
    [Age, Gender] -> [Age]
    [Gender] -> [Gender, Gender]

    We have defined the states and the actions but not the reward. The reward is a real number that is used for evaluating the quality of a state. For example if a robot is trying to reach the exit of a maze and decides to go to the exit as his next action, then the reward associated to this action will be “good”. If he selects as a next action to go in a trap then the reward will be “not good”. The reward is a value that brought information about the previous action taken.

    In the problem of feature selection an interesting reward could be a value of accuracy that is added to the model by adding a new feature. Here is an example of how the reward is computed:

    [Age] --> Accuracy = 0.65
    [Age, Gender] --> Accuracy = 0.76
    Reward(Gender) = 0.76 - 0.65 = 0.11

    For each state that we visit for the first time a classifier will be trained with the set of features. This value is stored in the state and the training of the classifier, which is very costly, will only happens once even if the state is reached again later. The classifier does not consider the order of the feature. This is why we can see this problem as a graph and not a tree. In this example, the reward of the action of selecting Gender as a new feature for the model is the difference between the accuracy of the current state and the next state.

    Example of the possible states and actions with rewards associated to each
    Each state has several possible actions and rewards associated (Image by the author)

    On the graph above, each feature has been mapped to a number (i.e “Age” is 1, “Gender” is 2 and “Height” is 3). It is totally possible to take other metrics to maximise to find the optimal set. In many business applications the recall is more considered than the accuracy.

    The next important question is how do we select the next state from the current state or how do we explore our environement. We have to find the most optimal way to do it since it can quickly become a very complex problem. Indeed, if we naively explore all the possible set of features in a problem with 10 features, the number of states would be

    10! + 2 = 3 628 802 possible states

    The +2 is because we consider an empty state and a state that contains all the possible features. In this problem we would have to train the same model on all the states to get the set of features that maximises the accuracy. In the RL approach we will not have to go in all the states and to train a model every time that we go in an already visited state.

    We had to determine some stop conditions for this problem and they will be detailed later. For now the epsilon-greedy state selection has been chosen. The idea is from a current state we select the next action randomly with a probability of epsilon (between 0 and 1 and often around 0.2) and otherwise select the action that maximises a function. For feature selection the function is the average of reward that each feature has brought to the accuracy of the model.

    The epsilon-greedy algorithm implies two steps:

    1. A random phase : with a probability epsilon, we select randomly the next state among the possible neighbours of the current state (we can imagine either a uniform or a softmax selection)
    2. A greedy phase : we select the next state such that the feature added to the current state has the maximal contribution of accuracy to the model. To reduce the time complexity, we have initialised a list containing this values for each feature. This list is updated every time that a feature is chosen. The update is very optimal thanks to the following formula:
    Update of the average of reward list for each feature (Image by the author)
    • AORf : Average of reward brought by the feature “f”
    • k : number of times that “f” has been selected
    • V(F) : state’s value of the set of features F (not detailed in this article for clarity reasons)

    The global idea is to find which feature has brought the most accuracy to the model. That is why we need to browse different states to evaluate in many different environments the most global accurate value of a feature for the model.

    Finally I will detail the two stop conditions. Since the goal is to minimise the number of state that the algorithm visits we need to be careful about them. The less never visited state we visit, the less amount of models we will have to train with different set of features. Training the model to get the accuracy is the most costly phase in terms of time and computation power.

    1. The algorithm stops in any case in the final state which is the set containing all the features. We want to avoid reaching this state since it is the most expensive to train a model with.
    2. Also, it stops browsing the graph if a sequence of visited states see their values degrade successively. A threshold has been set such that after square root of the number of total features in the dataset, it stops exploring.

    Now that the modelling of the problem has been explained, we will detail the implementation in python.

    The python library for Feature Selection with Reinforcement Learning

    A python library resolving this problem is available. I will explain in this part how it works and prove that it is an efficient strategy. Also, this article stands as a documentation and you will be able to use this library for your projects by the end of the part.

    1. The data pre-processing

    Since we need to evaluate the accuracy of a state that is visited, we need to feed a model with the features and the data used for this feature selection task. The data has to been normalised, the categorical variables encoded and have as few rows as possible (the smallest it is, the fastest the algorithm will be). Also, it’s very important to create a mapping between the features and some integers as explained in the previous part. This step is not mandatory but very recommended. The final result of this step is to get a DataFrame with all the features and another with the labels to predict. Below is an example with a dataset used as a benchmark (it can be found here UCI Irvine Machine Learning Repository).

    2. Installation and importation of the FSRLearning library

    The second step is to install the library with pip. Here is the command to install it:

    pip install FSRLearning

    To import the library the following code can be used:

    You will be able to create a feature selector simply by creating an object Feature_Selector_RL. Some parameters need to be filled in.

    • feature_number (integer) : number of features in the DataFrame X
    • feature_structure (dictionary) : dictionary for the graph implementation
    • eps (float [0; 1]) : probability of choosing a random next state, 0 is an only greedy algorithm and 1 only random
    • alpha (float [0; 1]): control the rate of updates, 0 is a very not updating state and 1 a very updated
    • gamma (float [0, 1]): factor of moderation of the observation of the next state, 0 is a shortsighted condition and 1 it exhibits farsighted behavior
    • nb_iter (int): number of sequences to go through the graph
    • starting_state (“empty” or “random”) : if “empty”, the algorithm starts from the empty state and if “random”, the algorithm starts from a random state in the graph

    All the parameters can be tuned but for most of the problem only few iterations can be good (around 100) and the epsilon value around 0.2 is often enough. The starting state is useful to browse the graph more efficiently but it can be very dependent on the dataset and both of the values can be tested.

    Finally we can initialise very simply the selector with the following code:

    Training the algorithm is very easy on the same basis than most of the ML library:

    Here is an example of the output:

    Output of the selector (Image by the author)

    The output is a 5-tuple as follows:

    • Index of the features in the DataFrame X (like a mapping)
    • Number or times that the feature has been observed
    • Average of reward brought by the feature after all the iterations
    • Ranking of the features from the least to the most important (here 2 is the least and 7 the most important feature)
    • Number of states globally visited

    Another important method of this selector is to get a comparison with the RFE selector of Scikit-Learn. It takes as input X, y and the results of the selector.

    The output is a print after each step of selection of the global metric of RFE and FSRLearning. It also outputs a visual comparison of the accuracy of the model with on the x-axis the number of features selected and on the y-axis the accuracy. The two horizontal lines are the median of accuracy for each method. Here is an example:

    Comparison between RL and RFE method (Image by the author)
    Average benchmark accuracy : 0.854251012145749, rl accuracy : 0.8674089068825909 
    Median benchmark accuracy : 0.8552631578947368, rl accuracy : 0.868421052631579
    Probability to get a set of variable with a better metric than RFE : 1.0
    Area between the two curves : 0.17105263157894512

    In this example the RL method has always given a better set of features for the model than RFE. We can then select with certainty among the sorted set of features any subset and it will give a better accuracy to the model. We can run several times the model and the comparator to get a very accurate estimation but the RL method is always better.

    Another interesting method is the get_plot_ratio_exploration. It plots a graph comparing the number of already visited nodes and visited nodes in a sequence for a precise iteration.

    Comparison of visited and not visited state at each iteration (Image by the author)

    Also, thanks to the second stop condition the time complexity of the algorithm decreases exponentially. Then even if the number of feature is big the convergence will be found quickly. The plot bellow is the number of times that a set of a certain size has been visited.

    Number of visited states in function of their size (Image by the author)

    In all iterations the algorithm visited a state containing 6 variables or less. Beyond 6 variables we can see that the number of state reached is decreasing. It’s a good behaviour since it is faster to train a model with small set of features than the big ones.

    Conclusion and references

    Overall, we can see that the RL method is very efficient for maximising a metric of a model. It always converges quickly toward an interesting subset of features. Also, this method is very easy and fast to implement in ML projects with the FSRLearning library.

    The github repository of the project with a complete documentation is available here.

    If you wish to contact me, you can do so directly on linkedin here

    This library has been implemented with the help of these two articles:


    Reinforcement Learning for feature selection was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    Reinforcement Learning for feature selection

    Go Here to Read this Fast! Reinforcement Learning for feature selection

  • Mastering Statistical Tests

    Sheref Nasereldin, Ph.D.

    Your Guide to Choosing the Right Test for Your Data (Part I)

    Photo by Adam Nowakowski on Unsplash

    Have you ever had a dataset and found yourself lost and confused about which statistical significance test is most suitable to answer your research question? Well, let me assure you, you’re not alone. I was once that person! Despite my respect for Statistics, I never had a great passion for it. In this article, I will focus on unraveling some key concepts to help you make informed decisions when choosing the right statistical significance test for your data. Since performing statistical significant testing essentially involves dealing with variables (independent and dependent), I find it imperative to pay a visit to the different types of those variables.

    Types of data:

    Photo by Claudio Schwarz on Unsplash

    1- Categorical or nominal

    A categorical (or nominal) variable has two or more categories without intrinsic order. For instance, eye color is a categorical variable with categories like blue, green, brown, and hazel. There is no agreed way to rank these categories. If a variable has a clear order, it is an ordinal variable, discussed below.

    2- Ordinal

    An ordinal variable is like a categorical variable, but with a clear order. For example, consider customer satisfaction levels: dissatisfied, neutral, and satisfied. These categories can be ordered, but the spacing between them is not consistent. Another example is pain severity: mild, moderate, and severe. Although we can rank these levels, the difference in pain between each category varies. If the categories were equally spaced, the variable would be an interval variable.

    3- Interval or numerical

    An interval (or numerical) variable, unlike an ordinal variable, has equally spaced intervals between values. For instance, consider temperature measured in Celsius. The difference between 20°C and 30°C is the same as between 30°C and 40°C. This equal spacing distinguishes interval variables from ordinal variables.

    Are you still pondering the consequences of not correctly identifying the type of data? Let’s clarify with a simple example. Imagine needing to compute the mean of a dataset that is categorical or ordinal. Does this hold any meaningful interpretation? For instance, what would the average “eye color” signify? It’s clearly nonsensical. That said, it should also be emphasized that the type of data is not the only factor in determining the statistical test. The number of the independent and the dependent variable stands on an equal footing with the type of the data.

    Photo by Samuel Regan-Asante on Unsplash

    I would also like to remind you that there is no need to be intimidated by the number of tests to be discussed. One good way to think about these tests is that they are different approaches to calculate the p-value. The p-value itself can be conceived as a measure of the statistical compatibility of the data with the null hypothesis. That is,

    The smaller the p-value, the greater the statistical incompatibility of the data with the null hypothesis, if the underlying assumptions used to calculate the p-value hold. This incompatibility can be interpreted as casting doubt on or providing evidence against the null hypothesis or the underlying assumptions.

    Now, let us delve without any further due into the different tests that you need to understand when and how to use.

    Statistical tests:

    1- One sample student’s t-test

    The one-sample t-test is a statistical test used to determine whether the mean of a single sample (from a normally distributed interval variable) of data significantly differs from a known or hypothesized population mean. This test is commonly used in various fields to assess whether a sample is representative of a larger population or to test hypotheses about population means when the population standard deviation is unknown.

    import pandas as pd
    from scipy import stats
    import numpy as np

    # Sample data (scores of 20 students)
    scores = [72, 78, 80, 73, 69, 71, 76, 74, 77, 79, 75, 72, 70, 73, 78, 76, 74, 75, 77, 79]

    # Population mean under the null hypothesis
    pop_mean = 75

    # Create a pandas DataFrame
    df = pd.DataFrame(scores, columns=['Scores'])

    # Calculate sample mean and sample standard deviation
    sample_mean = df['Scores'].mean()
    sample_std = df['Scores'].std(ddof=1) # ddof=1 for sample standard deviation

    # Number of observations
    n = len(df)

    # Perform one-sample t-test
    t_statistic, p_value = stats.ttest_1samp(df['Scores'], pop_mean)

    # Critical t-value for two-tailed test at alpha=0.05 (95% confidence level)
    alpha = 0.05
    t_critical = stats.t.ppf(1 - alpha/2, df=n-1)

    # Output results
    print("Sample Mean:", sample_mean)
    print("Sample Standard Deviation:", sample_std)
    print("Degrees of Freedom (df):", n - 1)
    print("t-statistic:", t_statistic)
    print("p-value:", p_value)
    print("Critical t-value (two-tailed, α=0.05):", t_critical)

    # Decision based on p-value
    if p_value < alpha:
    print("Reject the null hypothesis. There is a significant difference between the sample mean and the population mean.")
    else:
    print("Fail to reject the null hypothesis. There is no significant difference between the sample mean and the population mean.")

    2- Binomial test

    The test is used to determine if the proportion of successes in a sample is significantly different from a hypothesized proportion. It’s particularly useful when dealing with binary outcomes, such as success/failure or yes/no scenarios. This test is widely used in fields such as medicine, marketing, and quality control, where determining the significance of proportions is crucial.

    from scipy import stats

    # Define the observed number of successes and the number of trials
    observed_successes = 55
    n_trials = 100
    hypothesized_probability = 0.5

    # Perform the binomial test
    p_value = stats.binom_test(observed_successes, n_trials, hypothesized_probability, alternative='two-sided')

    print('Results of the binomial test:')
    print(f'Observed successes: {observed_successes}')
    print(f'Number of trials: {n_trials}')
    print(f'Hypothesized probability: {hypothesized_probability}')
    print(f'P-value: {p_value}')

    # Set significance level
    alpha = 0.05

    # Decision based on p-value
    if p_value < alpha:
    print("Reject the null hypothesis: The coin is not fair.")
    else:
    print("Fail to reject the null hypothesis: There is no evidence to suggest the coin is not fair.")

    3- Chi-square goodness of fit

    The test is used to determine if an observed frequency distribution of a categorical variable differs significantly from an expected distribution. It helps assess whether the observed data fits a specific theoretical distribution. This test is widely used in fields such as genetics, marketing, and psychology to validate hypotheses about distributions.

    import numpy as np
    from scipy.stats import chisquare

    # Observed frequencies
    observed = np.array([25, 30, 20, 25])

    # Expected frequencies for a uniform distribution
    expected = np.array([25, 25, 25, 25])

    # Perform Chi-Square Goodness of Fit test
    chi2_stat, p_value = chisquare(f_obs=observed, f_exp=expected)

    print('Results of the Chi-Square Goodness of Fit test:')
    print(f'Observed frequencies: {observed}')
    print(f'Expected frequencies: {expected}')
    print(f'Chi-square statistic: {chi2_stat}')
    print(f'P-value: {p_value}')

    # Set significance level
    alpha = 0.05

    # Decision based on p-value
    if p_value < alpha:
    print("Reject the null hypothesis: The observed distribution does not fit the expected distribution.")
    else:
    print("Fail to reject the null hypothesis: The observed distribution fits the expected distribution.")

    4- Two independent samples t-test

    The test is used to compare the means of a normally distributed continuous dependent variable between two independent groups.For instance, imagine we’re assessing the impact of a medical intervention. We recruit 100 participants, randomly assigning 50 to a treatment group and 50 to a control group. Here, we have two distinct samples, making the unpaired t-test appropriate for comparing their outcomes.

    import numpy as np
    from scipy import stats

    # Generate example data (normally distributed)
    np.random.seed(42) # for reproducibility
    treatment_group = np.random.normal(loc=75, scale=10, size=50)
    control_group = np.random.normal(loc=72, scale=10, size=50)

    # Perform independent samples t-test
    t_statistic, p_value = stats.ttest_ind(treatment_group, control_group)

    # Decision based on p-value
    alpha = 0.05
    if p_value < alpha:
    print("Reject the null hypothesis: There is a significant difference in the treatment effect between groups.")
    else:
    print("Fail to reject the null hypothesis: There is no significant difference in the treatment effect between groups.")

    5- Wilcoxon-Mann-Whitney test (Mann-Whitney U test)

    It is a non-parametric test, meaning it makes no assumptions about the variables distributions, used to compare the medians of two independent groups. It assesses whether the distributions of two samples are different without assuming the data follow a specific distribution. This test is particularly useful when the assumptions of the independent samples t-test (such as normality and equal variance) are not met or when analyzing ordinal or interval data that do not meet parametric assumptions.

    import numpy as np
    from scipy.stats import mannwhitneyu

    # Generate example data
    np.random.seed(42) # for reproducibility
    group1 = np.random.normal(loc=50, scale=10, size=30)
    group2 = np.random.normal(loc=55, scale=12, size=35)

    # Perform Wilcoxon-Mann-Whitney test
    statistic, p_value = mannwhitneyu(group1, group2)

    # Print results
    print('Results of the Wilcoxon-Mann-Whitney test:')
    print(f'Statistic: {statistic}')
    print(f'P-value: {p_value}')

    # Decision based on p-value
    alpha = 0.05
    if p_value < alpha:
    print("Reject the null hypothesis: The distributions of the two groups are significantly different.")
    else:
    print("Fail to reject the null hypothesis: There is no significant difference in the distributions of the two groups.")

    6- Chi-square test of independence

    The chi-square test of independence is used to determine if there is a significant association between two categorical variables. It helps identify whether the distribution of one variable is independent of the other. This test is widely applied in fields like marketing, social sciences, and biology. To perform this test, you first need to pivot the data to create a contingency table, as shown in the Python code below. Additionally, the chi-square test assumes that the expected value for each cell is five or higher. To find the expected value of a specific cell, we multiply the row total by the column total and then divide by the grand total. If this condition is not verified, we must use the next test

    import numpy as np
    import pandas as pd
    from scipy.stats import chi2_contingency

    # Create a contingency table
    data = pd.DataFrame({
    'Gender': ['Male', 'Male', 'Female', 'Female'],
    'Preference': ['Yes', 'No', 'Yes', 'No'],
    'Count': [20, 10, 30, 40]
    })

    # Pivot the data to get the contingency table
    contingency_table = data.pivot(index='Gender', columns='Preference', values='Count').fillna(0).values

    # Perform Chi-Square Test of Independence
    chi2_stat, p_value, dof, expected = chi2_contingency(contingency_table)

    # Print results
    print('Results of the Chi-Square Test of Independence:')
    print(f'Chi-square statistic: {chi2_stat}')
    print(f'P-value: {p_value}')
    print(f'Degrees of freedom: {dof}')
    print('Expected frequencies:')
    print(expected)

    # Decision based on p-value
    alpha = 0.05
    if p_value < alpha:
    print("Reject the null hypothesis: There is a significant association between gender and product preference.")
    else:
    print("Fail to reject the null hypothesis: There is no significant association between gender and product preference.")

    7- Fisher’s exact test

    The test can be thought of as an alternative to chi-square test when one or more of your contingency table cells has an expected frequency of less than five. This makes it particularly valuable for small sample sizes or when dealing with sparse data.

    import numpy as np
    from scipy.stats import fisher_exact

    # Create a contingency table
    # Example data: treatment group vs. control group with success and failure outcomes
    # Treatment group: 12 successes, 5 failures
    # Control group: 8 successes, 7 failures
    contingency_table = np.array([[12, 5],
    [8, 7]])

    # Perform Fisher's Exact Test
    odds_ratio, p_value = fisher_exact(contingency_table)

    # Print results
    print('Results of Fisher's Exact Test:')
    print(f'Odds ratio: {odds_ratio}')
    print(f'P-value: {p_value}')

    # Decision based on p-value
    alpha = 0.05
    if p_value < alpha:
    print("Reject the null hypothesis: There is a significant association between the treatment and the outcome.")
    else:
    print("Fail to reject the null hypothesis: There is no significant association between the treatment and the outcome.")

    8- Paired t-test

    This is the ‘dependent’ version of the student’s t-test that I have covered previously. The test is used to compare the means of two related groups to determine if there is a statistically significant difference between them. This test is commonly applied in before-and-after studies, or when the same subjects are measured under two different conditions.

    import numpy as np
    from scipy.stats import ttest_rel

    # Example data: test scores before and after a training program
    before = np.array([70, 75, 80, 85, 90])
    after = np.array([72, 78, 85, 87, 93])

    # Perform paired t-test
    t_statistic, p_value = ttest_rel(before, after)

    # Print results
    print('Results of the paired t-test:')
    print(f'T-statistic: {t_statistic}')
    print(f'P-value: {p_value}')

    # Decision based on p-value
    alpha = 0.05
    if p_value < alpha:
    print("Reject the null hypothesis: There is a significant difference between the before and after scores.")
    else:
    print("Fail to reject the null hypothesis: There is no significant difference between the before and after scores.")
    Photo by Tim Mossholder on Unsplash

    Okay, were you able to guess the next test? If you are thinking that we will now relax the normality condition and thus need a non-parametric test, then congratulations. This non-parametric test is called the Wilcoxon Signed-Rank Test.

    9- Wilcoxon signed-rank test

    The Wilcoxon Signed-Rank Test is a non-parametric test used to compare two related samples or repeated measurements on a single sample to assess whether their population mean ranks differ. It is often used as an alternative to the paired t-test when the data does not meet the assumptions of normality.

    import numpy as np
    from scipy.stats import wilcoxon

    # Example data: stress scores before and after a meditation program
    before = np.array([10, 15, 20, 25, 30])
    after = np.array([8, 14, 18, 24, 28])

    # Perform Wilcoxon signed-rank test
    statistic, p_value = wilcoxon(before, after)

    # Print results
    print('Results of the Wilcoxon Signed-Rank Test:')
    print(f'Statistic: {statistic}')
    print(f'P-value: {p_value}')

    # Decision based on p-value
    alpha = 0.05
    if p_value < alpha:
    print("Reject the null hypothesis: There is a significant difference between the before and after scores.")
    else:
    print("Fail to reject the null hypothesis: There is no significant difference between the before and after scores.")

    10- McNemar test

    Yes, it is exactly what you are thinking of; this is the counterpart of the paired t-test but for when the dependent variable is categorical.

    import numpy as np
    from statsmodels.stats.contingency_tables import mcnemar

    # Create a contingency table
    # Example data: before and after treatment
    # [[before success, before failure],
    # [after success, after failure]]
    contingency_table = np.array([[15, 5], # before success, before failure
    [3, 17]]) # after success, after failure

    # Perform McNemar test
    result = mcnemar(contingency_table, exact=True)

    # Print results
    print('Results of the McNemar Test:')
    print(f'Statistic: {result.statistic}')
    print(f'P-value: {result.pvalue}')

    # Decision based on p-value
    alpha = 0.05
    if result.pvalue < alpha:
    print("Reject the null hypothesis: There is a significant difference between before and after proportions.")
    else:
    print("Fail to reject the null hypothesis: There is no significant difference between before and after proportions.")

    Conclusion

    In this part, I have covered three main groups of common statistical tests. The first group is necessary when analyzing a single population (One Sample Student’s t-test, Binomial test, and Chi-square goodness of fit). The second group (two independent samples t-test, Mann-Whitney U test, and Chi-square test of independence (Fisher’s exact test)) focuses on calculating p-values when examining the relationship between one dependent variable and one independent variable (specifically with exactly two independent groups). In the third group, I addressed tests (paired t-test, Wilcoxon signed-rank test, and McNemar test) required when assuming dependence between the two levels of the independent variable.

    In Part II, I will explore the tests specifically required when increasing the number of levels (both independent and dependent) of a single independent variable beyond two.

    References:

    [1] https://stats.oarc.ucla.edu/sas/whatstat/what-statistical-analysis-should-i-usestatistical-analyses-using-sas/

    [2] https://stats.oarc.ucla.edu/other/mult-pkg/whatstat/#assumption

    [3] https://www.stat.berkeley.edu/~aldous/Real_World/ASA_statement.pdf


    Mastering Statistical Tests was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    Mastering Statistical Tests

    Go Here to Read this Fast! Mastering Statistical Tests

  • Efficient and cost-effective multi-tenant LoRA serving with Amazon SageMaker

    Efficient and cost-effective multi-tenant LoRA serving with Amazon SageMaker

    Michael Nguyen

    In this post, we explore a solution that addresses these challenges head-on using LoRA serving with Amazon SageMaker. By using the new performance optimizations of LoRA techniques in SageMaker large model inference (LMI) containers along with inference components, we demonstrate how organizations can efficiently manage and serve their growing portfolio of fine-tuned models, while optimizing costs and providing seamless performance for their customers. The latest SageMaker LMI container offers unmerged-LoRA inference, sped up with our LMI-Dist inference engine and OpenAI style chat schema. To learn more about LMI, refer to LMI Starting Guide, LMI handlers Inference API Schema, and Chat Completions API Schema.

    Originally appeared here:
    Efficient and cost-effective multi-tenant LoRA serving with Amazon SageMaker

    Go Here to Read this Fast! Efficient and cost-effective multi-tenant LoRA serving with Amazon SageMaker

  • Aggregating Real-time Sensor Data with Python and Redpanda

    Aggregating Real-time Sensor Data with Python and Redpanda

    Tomáš Neubauer

    Simple stream processing using Python and tumbling windows

    Image by author

    In this tutorial, I want to show you how to downsample a stream of sensor data using only Python (and Redpanda as a message broker). The goal is to show you how simple stream processing can be, and that you don’t need a heavy-duty stream processing framework to get started.

    Until recently, stream processing was a complex task that usually required some Java expertise. But gradually, the Python stream processing ecosystem has matured and there are a few more options available to Python developers — such as Faust, Bytewax and Quix. Later, I’ll provide a bit more background on why these libraries have emerged to compete with the existing Java-centric options.

    But first let’s get to the task at hand. We will use a Python libary called Quix Streams as our stream processor. Quix Streams is very similar to Faust, but it has been optimized to be more concise in its syntax and uses a Pandas like API called StreamingDataframes.

    You can install the Quix Streams library with the following command:

    pip install quixstreams

    What you’ll build

    You’ll build a simple application that will calculate the rolling aggregations of temperature readings coming from various sensors. The temperature readings will come in at a relatively high frequency and this application will aggregate the readings and output them at a lower time resolution (every 10 seconds). You can think of this as a form of compression since we don’t want to work on data at an unnecessarily high resolution.

    You can access the complete code in this GitHub repository.

    This application includes code that generates synthetic sensor data, but in a real-world scenario this data could come from many kinds of sensors, such as sensors installed in a fleet of vehicles or a warehouse full of machines.

    Here’s an illustration of the basic architecture:

    Diagram by author

    Components of a stream processing pipeline

    The previous diagram reflects the main components of a stream processing pipeline: You have the sensors which are the data producers, Redpanda as the streaming data platform, and Quix as the stream processor.

    Data producers

    These are bits of code that are attached to systems that generate data such as firmware on ECUs (Engine Control Units), monitoring modules for cloud platforms, or web servers that log user activity. They take that raw data and send it to the streaming data platform in a format that that platform can understand.

    Streaming data platform

    This is where you put your streaming data. It plays more or less the same role as a database does for static data. But instead of tables, you use topics. Otherwise, it has similar features to a static database. You’ll want to manage who can consume and produce data, what schemas the data should adhere to. Unlike a database though, the data is constantly in flux, so it’s not designed to be queried. You’d usually use a stream processor to transform the data and put it somewhere else for data scientists to explore or sink the raw data into a queryable system optimized for streaming data such as RisingWave or Apache Pinot. However, for automated systems that are triggered by patterns in streaming data (such as recommendation engines), this isn’t an ideal solution. In this case, you definitely want to use a dedicated stream processor.

    Stream processors

    These are engines that perform continuous operations on the data as it arrives. They could be compared to just regular old microservices that process data in any application back end, but there’s one big difference. For microservices, data arrives in drips like droplets of rain, and each “drip” is processed discreetly. Even if it “rains” heavily, it’s not too hard for the service to keep up with the “drops” without overflowing (think of a filtration system that filters out impurities in the water).

    For a stream processor, the data arrives as a continuous, wide gush of water. A filtration system would be quickly overwhelmed unless you change the design. I.e. break the stream up and route smaller streams to a battery of filtration systems. That’s kind of how stream processors work. They’re designed to be horizontally scaled and work in parallel as a battery. And they never stop, they process the data continuously, outputting the filtered data to the streaming data platform, which acts as a kind of reservoir for streaming data. To make things more complicated, stream processors often need to keep track of data that was received previously, such as in the windowing example you’ll try out here.

    Note that there are also “data consumers” and “data sinks” — systems that consume the processed data (such as front end applications and mobile apps) or store it for offline analysis (data warehouses like Snowflake or AWS Redshift). Since we won’t be covering those in this tutorial, I’ll skip over them for now.

    Setting up a local streaming data cluster

    In this tutorial, I’ll show you how to use a local installation of Redpanda for managing your streaming data. I’ve chosen Redpanda because it’s very easy to run locally.

    You’ll use Docker compose to quickly spin up a cluster, including the Redpanda console, so make sure you have Docker installed first.

    Creating the streaming applications

    First, you’ll create separate files to produce and process your streaming data. This makes it easier to manage the running processes independently. I.e. you can stop the producer without stopping the stream processor too. Here’s an overview of the two files that you’ll create:

    • The stream producer: sensor_stream_producer.py
      Generates synthetic temperature data and produces (i.e. writes) that data to a “raw data” source topic in Redpanda. Just like the Faust example, it produces the data at a resolution of approximately 20 readings every 5 seconds, or around 4 readings a second.
    • The stream processor: sensor_stream_processor.py
      Consumes (reads) the raw temperature data from the “source” topic, performs a tumbling window calculation to decrease the resolution of the data. It calculates the average of the data received in 10-second windows so you get a reading for every 10 seconds. It then produces these aggregated readings to the agg-temperatures topic in Redpanda.

    As you can see the stream processor does most of the heavy lifting and is the core of this tutorial. The stream producer is a stand-in for a proper data ingestion process. For example, in a production scenario, you might use something like this MQTT connector to get data from your sensors and produce it to a topic.

    • For a tutorial, it’s simpler to simulate the data, so let’s get that set up first.

    Creating the stream producer

    You’ll start by creating a new file called sensor_stream_producer.py and define the main Quix application. (This example has been developed on Python 3.10, but different versions of Python 3 should work as well, as long as you are able to run pip install quixstreams.)

    Create the file sensor_stream_producer.py and add all the required dependencies (including Quix Streams)

    from dataclasses import dataclass, asdict # used to define the data schema
    from datetime import datetime # used to manage timestamps
    from time import sleep # used to slow down the data generator
    import uuid # used for message id creation
    import json # used for serializing data

    from quixstreams import Application

    Then, define a Quix application and destination topic to send the data.


    app = Application(broker_address='localhost:19092')

    destination_topic = app.topic(name='raw-temp-data', value_serializer="json")

    The value_serializer parameter defines the format of the expected source data (to be serialized into bytes). In this case, you’ll be sending JSON.

    Let’s use the dataclass module to define a very basic schema for the temperature data and add a function to serialize it to JSON.

    @dataclass
    class Temperature:
    ts: datetime
    value: int

    def to_json(self):
    # Convert the dataclass to a dictionary
    data = asdict(self)
    # Format the datetime object as a string
    data['ts'] = self.ts.isoformat()
    # Serialize the dictionary to a JSON string
    return json.dumps(data)

    Next, add the code that will be responsible for sending the mock temperature sensor data into our Redpanda source topic.

    i = 0
    with app.get_producer() as producer:
    while i < 10000:
    sensor_id = random.choice(["Sensor1", "Sensor2", "Sensor3", "Sensor4", "Sensor5"])
    temperature = Temperature(datetime.now(), random.randint(0, 100))
    value = temperature.to_json()

    print(f"Producing value {value}")
    serialized = destination_topic.serialize(
    key=sensor_id, value=value, headers={"uuid": str(uuid.uuid4())}
    )
    producer.produce(
    topic=destination_topic.name,
    headers=serialized.headers,
    key=serialized.key,
    value=serialized.value,
    )
    i += 1
    sleep(random.randint(0, 1000) / 1000)

    This generates 1000 records separated by random time intervals between 0 and 1 second. It also randomly selects a sensor name from a list of 5 options.

    Now, try out the producer by running the following in the command line

    python sensor_stream_producer.py

    You should see data being logged to the console like this:

    [data produced]

    Once you’ve confirmed that it works, stop the process for now (you’ll run it alongside the stream processing process later).

    Creating the stream processor

    The stream processor performs three main tasks: 1) consume the raw temperature readings from the source topic, 2) continuously aggregate the data, and 3) produce the aggregated results to a sink topic.

    Let’s add the code for each of these tasks. In your IDE, create a new file called sensor_stream_processor.py.

    First, add the dependencies as before:

    import os
    import random
    import json
    from datetime import datetime, timedelta
    from dataclasses import dataclass
    import logging
    from quixstreams import Application

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    Let’s also set some variables that our stream processing application needs:

    TOPIC = "raw-temperature" # defines the input topic
    SINK = "agg-temperature" # defines the output topic
    WINDOW = 10 # defines the length of the time window in seconds
    WINDOW_EXPIRES = 1 # defines, in seconds, how late data can arrive before it is excluded from the window

    We’ll go into more detail on what the window variables mean a bit later, but for now, let’s crack on with defining the main Quix application.

    app = Application(
    broker_address='localhost:19092',
    consumer_group="quix-stream-processor",
    auto_offset_reset="earliest",
    )

    Note that there are a few more application variables this time around, namely consumer_group and auto_offset_reset. To learn more about the interplay between these settings, check out the article “Understanding Kafka’s auto offset reset configuration: Use cases and pitfalls

    Next, define the input and output topics on either side of the core stream processing function and add a function to put the incoming data into a DataFrame.

    input_topic = app.topic(TOPIC, value_deserializer="json")
    output_topic = app.topic(SINK, value_serializer="json")

    sdf = app.dataframe(input_topic)
    sdf = sdf.update(lambda value: logger.info(f"Input value received: {value}"))

    We’ve also added a logging line to make sure the incoming data is intact.

    Next, let’s add a custom timestamp extractor to use the timestamp from the message payload instead of Kafka timestamp. For your aggregations, this basically means that you want to use the time that the reading was generated rather than the time that it was received by Redpanda. Or in even simpler terms “Use the sensor’s definition of time rather than Redpanda’s”.

    def custom_ts_extractor(value):

    # Extract the sensor's timestamp and convert to a datetime object
    dt_obj = datetime.strptime(value["ts"], "%Y-%m-%dT%H:%M:%S.%f") #

    # Convert to milliseconds since the Unix epoch for efficent procesing with Quix
    milliseconds = int(dt_obj.timestamp() * 1000)
    value["timestamp"] = milliseconds
    logger.info(f"Value of new timestamp is: {value['timestamp']}")

    return value["timestamp"]

    # Override the previously defined input_topic variable so that it uses the custom timestamp extractor
    input_topic = app.topic(TOPIC, timestamp_extractor=custom_ts_extractor, value_deserializer="json")

    Why are we doing this? Well, we could get into a philosophical rabbit hole about which kind of time to use for processing, but that’s a subject for another article. With the custom timestamp, I just wanted to illustrate that there are many ways to interpret time in stream processing, and you don’t necessarily have to use the time of data arrival.

    Next, initialize the state for the aggregation when a new window starts. It will prime the aggregation when the first record arrives in the window.

    def initializer(value: dict) -> dict:

    value_dict = json.loads(value)
    return {
    'count': 1,
    'min': value_dict['value'],
    'max': value_dict['value'],
    'mean': value_dict['value'],
    }

    This sets the initial values for the window. In the case of min, max, and mean, they are all identical because you’re just taking the first sensor reading as the starting point.

    Now, let’s add the aggregation logic in the form of a “reducer” function.

    def reducer(aggregated: dict, value: dict) -> dict:
    aggcount = aggregated['count'] + 1
    value_dict = json.loads(value)
    return {
    'count': aggcount,
    'min': min(aggregated['min'], value_dict['value']),
    'max': max(aggregated['max'], value_dict['value']),
    'mean': (aggregated['mean'] * aggregated['count'] + value_dict['value']) / (aggregated['count'] + 1)
    }

    This function is only necessary when you’re performing multiple aggregations on a window. In our case, we’re creating count, min, max, and mean values for each window, so we need to define these in advance.

    Next up, the juicy part — adding the tumbling window functionality:

    ### Define the window parameters such as type and length
    sdf = (
    # Define a tumbling window of 10 seconds
    sdf.tumbling_window(timedelta(seconds=WINDOW), grace_ms=timedelta(seconds=WINDOW_EXPIRES))

    # Create a "reduce" aggregation with "reducer" and "initializer" functions
    .reduce(reducer=reducer, initializer=initializer)

    # Emit results only for closed 10 second windows
    .final()
    )

    ### Apply the window to the Streaming DataFrame and define the data points to include in the output
    sdf = sdf.apply(
    lambda value: {
    "time": value["end"], # Use the window end time as the timestamp for message sent to the 'agg-temperature' topic
    "temperature": value["value"], # Send a dictionary of {count, min, max, mean} values for the temperature parameter
    }
    )

    This defines the Streaming DataFrame as a set of aggregations based on a tumbling window — a set of aggregations performed on 10-second non-overlapping segments of time.

    Tip: If you need a refresher on the different types of windowed calculations, check out this article: “A guide to windowing in stream processing”.

    Finally, produce the results to the downstream output topic:

    sdf = sdf.to_topic(output_topic)
    sdf = sdf.update(lambda value: logger.info(f"Produced value: {value}"))

    if __name__ == "__main__":
    logger.info("Starting application")
    app.run(sdf)

    Note: You might wonder why the producer code looks very different to the producer code used to send the synthetic temperature data (the part that uses with app.get_producer() as producer()). This is because Quix uses a different producer function for transformation tasks (i.e. a task that sits between input and output topics).

    As you might notice when following along, we iteratively change the Streaming DataFrame (the sdf variable) until it is the final form that we want to send downstream. Thus, the sdf.to_topic function simply streams the final state of the Streaming DataFrame back to the output topic, row-by-row.

    The producer function on the other hand, is used to ingest data from an external source such as a CSV file, an MQTT broker, or in our case, a generator function.

    Run the streaming applications

    Finally, you get to run our streaming applications and see if all the moving parts work in harmony.

    First, in a terminal window, start the producer again:

    python sensor_stream_producer.py

    Then, in a second terminal window, start the stream processor:

    python sensor_stream_processor.py

    Pay attention to the log output in each window, to make sure everything is running smoothly.

    You can also check the Redpanda console to make sure that the aggregated data is being streamed to the sink topic correctly (you’ll fine the topic browser at: http://localhost:8080/topics).

    Screenshot by author

    Wrapping up

    What you’ve tried out here is just one way to do stream processing. Naturally, there are heavy duty tools such Apache Flink and Apache Spark Streaming which are have also been covered extensively online. But — those are predominantly Java-based tools. Sure, you can use their Python wrappers, but when things go wrong, you’ll still be debugging Java errors rather than Python errors. And Java skills aren’t exactly ubiquitous among data folks who are increasingly working alongside software engineers to tune stream processing algorithms.

    In this tutorial, we ran a simple aggregation as our stream processing algorithm, but in reality, these algorithms often employ machine learning models to transform that data — and the software ecosystem for machine learning is heavily dominated by Python.

    An oft overlooked fact is that Python is the lingua franca for data specialists, ML engineers, and software engineers to work together. It’s even better than SQL because you can use it to do non-data-related things like make API calls and trigger webhooks. That’s one of the reasons why libraries like Faust, Bytewax and Quix evolved — to bridge the so-called impedance gap between these different disciplines.

    Hopefully, I’ve managed to show you that Python is a viable language for stream processing, and that the Python ecosystem for stream processing is maturing at a steady rate and can hold its own against the older Java-based ecosystem.


    Aggregating Real-time Sensor Data with Python and Redpanda was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    Aggregating Real-time Sensor Data with Python and Redpanda

    Go Here to Read this Fast! Aggregating Real-time Sensor Data with Python and Redpanda