How to Create an AI Agent to Manage Your Email Inbox and Reply to Your Cold Email: Code Included (Part-2)

Read Time:
minutes

Introduction

In part 1 of creating an AI agent to manage your inbox and cold emails, we saw how we can create an AI agent using Langchain and it works perfectly but it only runs when you run the code and it also replies in CLI. Is it really autonomous? đŸ€” I don’t think so because it still requires human interaction to run the agent and we don’t have a storage where we can see all the generated email replies.

What if we can run our agent whenever we receive a reply on our cold email and it will automatically classify and generate the response and then we can see all the generated replies in a beautiful UI. That will make it look more like an agent right? 👀 so let’s do it!

Workflow

Smartlead provides webhooks that will be triggered on different type of events like email reply, email sent, email open etc. We can use these webhooks to run our agent on a specific event. We will use Zapier to create a webhook catch URL which will be triggered by Smartlead webhooks and we will run our agents using Zapier.

We can’t directly trigger the agent python code from Zapier so will create a serverless in which we will create some API routes for different purposes like /generate-response, /get-live-logs or /get-history. You can also use a server for this but serverless will be a bit faster and easy to manage as we only have 2-3 routes.

We will use PostgreSQL to store all the previously generated emails so that users can keep track of past emails and we will use Streamlit to create a user interface to show previously generated email responses and live agent logs coming from our serverless.

Here is how the new workflow will look like 👇

Let’s discuss how we can make it in detail!

Creating Backend Serverless

First of all, let’s create our backend where we will run our agent and interact with database. Basically we will create 3 routes:

  • /generate-email:  To run our agent and generate email response if needed
  • /get-emails: To get previously generated emails from postgreSQL database
  • /get-live-logs: To get the live logs of agent if it is running for any email

Create a file called “constants.py” and store your environment variables in it


SMARTLEAD_API_KEY = "d1eefe52-xxxx-xxx-xxxxxx"
OPENAI_KEY = "sk-xxxxxxx"
APOLLO_API_KEY = "Gxxxxxx"
# I am using local postgreSQL server
DATABASE_URL = "postgresql://postgres:shivam@localhost:5432/coldemail_fastapi"

Setting up Database Connection

Now let’s setup our database connection. We will use sqlalchemy to connect our PostgreSQL DB with server.

Create a file called “dbClient.py” and add this code in it:


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from schema import Base, Item
from constants import DATABASE_URL
    
# Create SQLAlchemy engine
engine = create_engine(DATABASE_URL)

# Create a SQLAlchemy session
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Create database tables
Base.metadata.create_all(bind=engine)
db = SessionLocal()

We will use Fastapi to create a REST API server and we will use mangum to create a serverless handler which you can use to deploy your serverless application on platforms like aws lambda.

Let’s first import required dependencies and create our first route in “main.py” file


from fastapi import FastAPI
# from app.api.api_v1.api import router as api_router
from mangum import Mangum
from agent import run_agent
from pydantic import BaseModel
from dbClient import db
from schema import Item
from logs import logs,reset_logs

app = FastAPI()
handler = Mangum(app,lifespan="off")

@app.get("/")
async def root():
    return {"message": "Hello World!"}

We will store this information about any agent-generated email in our database:

  • id: A unique id for each entry or column
  • email: Email of lead
  • category: The email category given by our agent
  • logs: Email agent logs to track the generation process and actions it is taking to generate each email response
  • message: The lead’s reply for which the response was generated
  • generated_response: The final response generated by agent for given message
  • campaign_id: Campaign id of the cold email campaign from smartlead
  • status: Status of process ( it will be RUNNING or COMPLETED ) which can be used to filter out live agent processes.

Create a file called “schema.py” and add the schema for our database:


from sqlalchemy import Column, Integer, String, ARRAY
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Item(Base):
    __tablename__ = "emails"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, index=True)
    category = Column(String)
    logs = Column(ARRAY(String))
    generated_response = Column(String)
    message = Column(String)
    campaign_id = Column(Integer)
    status = Column(String)

Creating Logs for our Agent

For every agent event, we will add some information in our logs array so that users can keep track of agent activity in real time. Let’s implement “add_to_logs” function which will add a new entry in current agent logs. Every log will be in form of string which will contain datetime and some information about that activity.

I have created a folder called “helpers” and in that folder I have created a file called “add_to_logs.py”


from sqlalchemy.orm import Session
from schema import Item
from dbClient import db

def add_to_logs(email: str, new_log: str):
    # Retrieve the Item object from the database
    item = db.query(Item).filter(Item.email == email).first()
    print("Adding logs...", new_log)
    print(item.logs)
    if item:
        item.logs = item.logs + [new_log] 
        # Update the Item object in the database
        db.commit()
        # db.refresh(item)
        print("Updated logs",item.logs)


Let’s add our agent tools in a file called “tools.py” 🔹


from langchain.pydantic_v1 import BaseModel, Field
from typing import Type, List
from langchain.tools import BaseTool
import requests
import json
from constants import APOLLO_API_KEY, OPENAI_KEY
from openai import OpenAI as OpenAIPython
from logs import logs
from datetime import datetime
from helpers.addLogs import add_to_logs

client = OpenAIPython(
    # This is the default and can be omitted
    api_key=OPENAI_KEY,
)

class EmailWriterToolInput(BaseModel):
    latest_reply: str = Field(description="Latest reply from the prospect")
    conversation_history: str = Field(description="Array of conversation history")
    sender: str = Field(description="Name of sender")
    company_info: str = Field(description="Information about sender's company")
    email: str = Field(description="lead email")

class EmailWriterTool(BaseTool):
    name = "email_writer_tool"
    description = "use this tool when you have given a email and you have to construct a reply for it"
    args_schema: Type[BaseModel] = EmailWriterToolInput

    def _run(self, latest_reply: str, conversation_history: str, sender: str,company_info: str, email: str):
        add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 📝 Generating Email Response...")
        print("📝 Generating Email Response...")
        headers = {
            "Content-Type": "application/json"
        }
        data = {
            "params": {
                "client_email": latest_reply,
                "sender":sender,
                "conversation_history":conversation_history,
                "company_description":company_info
            },
            "project": "2b9xxxxx-xxxx-xxxxxx-xxx"
        }

        res = requests.post("https://api-f1db6c.stack.tryrelevance.com/latest/studios/xxxx-xxxxxx-xxxxx/trigger_limited",data=json.dumps(data),headers=headers)
        res = res.json()
        return res["output"]["answer"]

    def _arun(self, url: str):
        raise NotImplementedError(
            "email writer tool does not support async")
    
class CompanySearchToolInput(BaseModel):
    email: str = Field(description="Email of sender")
    category: str = Field(description="Category of email")

class CompanySearchTool(BaseTool):
    name = "company_search_tool"
    description = "use this tool when you want to get information about any company"
    args_schema: Type[BaseModel] = CompanySearchToolInput

    def _run(self, email: str, category: str):
        add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 🔍 searching about company information...")
        print("🔍 searching about company information...")
#         
        data = {
          "api_key":APOLLO_API_KEY,
          "email":email
        }
        response = requests.post(f"https://api.apollo.io/v1/people/match",data=data)
        response = response.json()
        if "organization" in response["person"] and response["person"]["organization"] is not None and response["person"]["organization"]["short_description"] is not None:
            return response["person"]["organization"]["short_description"]
        else:
            return "No data"

    def _arun(self, url: str):
        raise NotImplementedError(
            "categorise_email does not support async")
    

class CategorizeEmailInput(BaseModel):
    conversation: str = Field(description="Email conversation array")
    email:str = Field(description="Lead email")

class CategorizeEmailTool(BaseTool):
    name = "email_categorizer_tool"
    description = "use this tool when have email conversation history and you want to categorize this email"
    args_schema: Type[BaseModel] = CategorizeEmailInput

    def _run(self, conversation: str,email: str):
        add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: đŸ“© Categorizing Email...")
        print("đŸ“© Categorizing Email...")
        prompt = f"""
            Email Conversation History:
            ---
            {conversation}
            ---
            You have given an array of conversation between Rohan Sawant and a client
            Your goal is to categorize this email based on the conversation history from the given categories:

            1. Meeting_Ready_Lead: they have shown positive intent and are interested in getting on a call
            2. Power: If they’re interested and we want to push for a call
            3. Question: If they have any question regarding anything
            4. Unsubscribe: They want to unsubscribe themselves from our email list
            5. OOO: They are out of office
            6. No_Longer_Works: They no longer works in the company
            7. Not_Interested: They are not interested
            8. Info: these are emails that don't fit into any of the above categories.

            Note: Your final response MUST BE the category name ONLY

            RESPONSE:
        """
        message = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "user", "content": prompt}
            ]
        )
        category = message.choices[0].message.content
        add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: Email successfully categorized as {category}")
        print(f"Email successfully categorized as {category}")
        return category

    def _arun(self):
        raise NotImplementedError(
            "categorise_email does not support async")

Creating API Routes

Now we have completed the initial setup, it’s time to setup our API routes which we discussed earlier.

Let’s create a route to run our agent which will take email id and campaign id as a body. We will pass this information to our agent and run it đŸ€–


class Email_Body(BaseModel):
    email: str
    campaign_id: int

@app.post("/generate-email")
async def root(item: Email_Body):
		# Create entry in DB
    email_data = Item(email=item.email,category="",logs=[],generated_response="",message="",campaign_id=item.campaign_id,status="RUNNING")
    db.add(email_data)
    db.commit()
    db.refresh(email_data)
    # Run agent and get the response
    response = run_agent(item.campaign_id,item.email)
    # Add agent response in DB
    item = db.query(Item).filter(Item.email == item.email).first()
    if item:
        item.category = response['response']['category']
        item.generated_response = response['response']['reply']
        item.message = response['message']
        item.status = "COMPLETED"
        # Update the Item object in the database
        db.commit()
        db.refresh(item)
    return response

Let’s create a route to get all the previously generated emails so that we can show it in our streamlit application.


@app.get("/get-emails")
async def root():
    records = db.query(Item).all()
    records_json = [record.__dict__ for record in records]
    return records_json

And finally, create a route to get the live sessions of agent so that user can track the agent activity in real time.


@app.get("/get-live-logs")
async def root():
    item = db.query(Item).filter(Item.status == "RUNNING")
    records_json = [record.__dict__ for record in item]
    return records_json

And we have successfully created a server 🚀! You can host this code on any serverless platform like aws lambda or use it locally for personal use.

Streamlit User Interface

Now let’s build a user interface where user can see all the previously generated emails and live activity of agent if it’s running. We will use streamlit to create this user interface in which we will create 2 pages: 1) History 2) Live Logs. So let’s get started!

Creating The Interface

First create a python project and install required dependencies


!pip install streamlit requests

Create a file called “main.py” and add below code:


import streamlit as st
import requests
live_data = []

# Define a function to display logs when the dropdown is clicked
def show_logs(logs):
  for log in logs:
    st.markdown(f'''
    ```
    {log}
    ```
''')

def main():
  global live_data
  # Custom CSS for some labels and categories
  custom_css = """
  
  """
  st.markdown(custom_css, unsafe_allow_html=True)
  # Title and description
  with st.sidebar:
    selected_tab = st.sidebar.selectbox("Navigation", ["Live Logs", "History"], index=0)
  if selected_tab == "Live Logs":
      # Display live logs here
  elif selected_tab == "History":
	    # Display History here

if __name__ == "__main__":
  main()


Creating Live Agent Logs Page

We will create one page to show live agent activity where user can see all the instances of our agent in real time and track the agent activity.

We will make a call to “/get-live-logs” API endpoint which we just created in our server to get the agent instances which are running.

Let’s write a code to show live agent logs, Add the below code in first if condition


st.title("Live Agent Logs")
st.write("Here you can see the live logs from running agent")
# Get the live logs from server
response = requests.get("http://yourserver.com/get-live-logs")
live_data = response.json()
refresh_button_placeholder = st.empty()
refresh_button_placeholder.button("Refresh", on_click=refresh_card)
# Display cards
for i, card in enumerate(live_data):
  with st.container(border=True):  
      st.subheader("Email: {}".format(card["email"]))
      # Add refresh button
      st.markdown(f"""

Status:

{card['status']}

""", unsafe_allow_html=True) st.write("Category") st.markdown(f""" ``` {card["category"]} """) # Display email and reply st.write("Message:") st.markdown(f""" ``` {card["message"]} """) st.write("Generated Response:") st.markdown(f""" ``` {card["generated_response"] if card["generated_response"] != "" else "No Response"} """) # Whenever user clicks on logs expander, we will show them logs with st.expander("Logs"): show_logs(card["logs"])

We will also add a refresh button in the live logs cards so that user can click refresh to get the latest logs so let’s write the code for that too!


def refresh_card():
    global live_data
    response = requests.get("https://yourserver.com/beta/get-live-logs")
    live_data = response.json()

From sidebar, you can select live logs page and see the live agent logs which looks like this

Creating Email History Page

We will create one more page to show all the previously generated email responses. Here we will use “/get-emails” API endpoint we created in our server.

Now let’s write a code to show previously generated emails, Add the below code in else condition:


st.title("Cold Email Agent")
st.write("An autonomous agent which can reply to your cold emails in your tone")
# Get previous email
response = requests.get("https://yourserver.com/beta/get-emails")
response = response.json()
# Display cards
for i, card in enumerate(response):
    with st.container(border=True):  
        st.subheader("Email: {}".format(card["email"]))
        # Add refresh button
        st.markdown(f"""

Status:

{card['status']}

""", unsafe_allow_html=True) st.write("Category:") st.markdown(f""" ``` {card["category"]} """) # Display email and reply st.write("Message:") st.markdown(f""" ``` {card["message"]} """) st.write("Generated Response:") st.markdown(f""" ``` {card["generated_response"] if card["generated_response"] != "" else "No Response"} """) with st.expander("Logs"): show_logs(card["logs"])

And now you can see the previously generated emails in user interface

You can also see the logs after clicking the “logs” button

And we have successfully built the both frontend and backend for our workflow 🎉!

Now it’s time to combine the building blocks using Zapier and we will be ready to use our agent!

Zapier Integration

Let’s first setup our first zap where we will capture the webhook event coming from smartlead. Here are the initial steps you need to perform to setup the Zapier webhooks.

Step-1: Create an account on Zapier and create a new zap.

Step-2: Click on “Add Action” and search “Webhooks by Zapier” and select it

Step-3: Select “Catch Hook” as an event and create a webhook URL which will be your webhook catch URL that can be passed as a webhook URL in smartlead settings.

Step-4: Go to your smartlead dashboard and go to settings → webhooks and click on “Add Webhook” to create a new webhook.

Step-5: Give your webhook a name and add the Zapier catch URL in webhook URL field. Select User as an association type so that your webhook runs for every lead.

Step-6: Smartlead provides webhooks for different kind of events like “Email Reply”, “Email click”, “Email sent” etc but here we want to run our agent when we get a reply so we will select “Email Reply” as an event.

The webhook response for “Email Reply” event will look like this:


{
  "webhook_id": 100,
  "webhook_name": "Test",
  "campaign_status": "COMPLETED",
  "stats_id": "id",
  "from_email": "[email protected]",
  "subject": "Proposal for Collaboration - smartlead.ai",
  "sent_message": {
    "message_id": "",
    "html": "
Test
", "text": "Test", "time": "2023-04-04T08:31:13.638+00:00" }, "to_email": "[email protected]", "to_name": "Support Test", "event_timestamp": "2023-04-04T08:31:22+00:00", "reply_message": { "message_id": "", "html": "

##- Please type your reply above this line -##

", "text": "##- Please type your reply above this line -##", "time": "2023-04-04T08:31:22+00:00" }, "campaign_name": "Link insertion", "campaign_id": 100, "client_id": null, "sequence_number": 1, "secret_key": "secretkey", "app_url": "https://app.smartlead.ai/app", "ui_master_inbox_link": "https://app.smartlead.ai/app", "description": "[email protected] replied to Email 1 for campaign - Link insertion ", "metadata": { "webhook_created_at": "2023-09-26T10:48:56.598Z" }, "webhook_url": "https://webhook.site/5168fa7f-0asd-465a-8114-111da474a77", "event_type": "EMAIL_REPLY" }

This is how your final webhook config will look like 👇

Step-7: To test the webhook, click on “Send Test To Webhook” Button and on Zapier dashboard, click on the step we just created and go to test section and click on “Test webhook” and you will see the information coming from smartlead webhooks in your zapier catch webhook.

Step-8: Now finally we will trigger our API after this action. click on add step and again select “Webhooks by Zapier” and select “POST” as an event. Click on continue.

Step-9: Add your server URL in URL field and specify the body data which you want to send to server. In our case we are going to send lead email and campaign id as we discussed before.

Click on continue and we have finally completed the Zapier configuration! ⚡

Now whenever any lead will reply to your email the api will be triggered and it will run your agent and all your generated email replies will be stored in our PostgreSQL database which you can see from the streamlit dashboard we just created.

So now it looks like an actual autonomous agent which can classify your emails, get information about leads, generate replies to your emails in your own style and tone and also can reply to leads using smartlead API 🚀!

Conclusion

As we discussed at starting, the agent which we created in part 1 was not properly autonomous because it still required human interaction to run and stop it. So we created a server to run our agent which will be triggered by smartlead webhooks to run it automatically whenever any lead replies to your email. We also created a beautiful user interface to track the previously generated emails and the real time agent activity.

So whether you are a small business or an individual who uses cold email to build connections and networks then this agent workflow will be best for you because it will do everything for you from classifying an email to replying to your leads in your tone and writing style. Isn’t it looking like a miracle? 👀.

Want to Automate Your Workflows?

So, whether you are a small team looking for clients, a job seeker looking for better opportunities, a freelancer looking for clients or a large organization seeking for more clients then cold emails are one of the best ways to get more connections and AI agents can automate this work for you.

If you are looking to build custom AI agents to automate your workflows like this then kindly book a call with us and we will be happy to convert your ideas into reality to make your life easy.

Thanks for reading 😄.

Book an AI consultation

Looking to build AI solutions? Let's chat.
‍
Schedule your consultation today - this not a sales call, feel free to come prepared with your technical queries.
‍
You'll be meeting Rohan Sawant, the Founder.
 Company
Book a Call

Let us help you.

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Behind the Blog 👀
Shivam Danawale
Writer

Shivam is an AI Researcher & Full Stack Engineer at Ionio.

Rohan Sawant
Editor

Rohan is the Founder & CEO of Ionio. I make everyone write all these nice articles... đŸ„”