How to Create an AI Agent to Manage Your Email Inbox and Reply to Your Cold Email: Code Included (Part-2)
This is part 2 of creating an AI agent to manage and reply to your cold emails blog where we saw how to create an AI agent using Langchain which can classify and reply to your cold emails in your tone and style. In this blog, we will see how to make it more autonomous by triggering it when someone replies to your email using smartlead webhooks and also we will create a user interface to track the agent's live activity and all the previously generated email responses.
In part 1 of creating an AI agent to manage your inbox and cold emails, we saw how we can create an AI agent using Langchain and it works perfectly but it only runs when you run the code and it also replies in CLI. Is it really autonomous? 🤔 I don’t think so because it still requires human interaction to run the agent and we don’t have a storage where we can see all the generated email replies.
What if we can run our agent whenever we receive a reply on our cold email and it will automatically classify and generate the response and then we can see all the generated replies in a beautiful UI. That will make it look more like an agent right? 👀 so let’s do it!
Workflow
Smartlead provides webhooks that will be triggered on different type of events like email reply, email sent, email open etc. We can use these webhooks to run our agent on a specific event. We will use Zapier to create a webhook catch URL which will be triggered by Smartlead webhooks and we will run our agents using Zapier.
We can’t directly trigger the agent python code from Zapier so will create a serverless in which we will create some API routes for different purposes like /generate-response, /get-live-logs or /get-history. You can also use a server for this but serverless will be a bit faster and easy to manage as we only have 2-3 routes.
We will use PostgreSQL to store all the previously generated emails so that users can keep track of past emails and we will use Streamlit to create a user interface to show previously generated email responses and live agent logs coming from our serverless.
Here is how the new workflow will look like 👇
Let’s discuss how we can make it in detail!
Creating Backend Serverless
First of all, let’s create our backend where we will run our agent and interact with database. Basically we will create 3 routes:
/generate-email: To run our agent and generate email response if needed
/get-emails: To get previously generated emails from postgreSQL database
/get-live-logs: To get the live logs of agent if it is running for any email
Create a file called “constants.py” and store your environment variables in it
SMARTLEAD_API_KEY = "d1eefe52-xxxx-xxx-xxxxxx"
OPENAI_KEY = "sk-xxxxxxx"
APOLLO_API_KEY = "Gxxxxxx"
# I am using local postgreSQL server
DATABASE_URL = "postgresql://postgres:shivam@localhost:5432/coldemail_fastapi"
Setting up Database Connection
Now let’s setup our database connection. We will use sqlalchemy to connect our PostgreSQL DB with server.
Create a file called “dbClient.py” and add this code in it:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from schema import Base, Item
from constants import DATABASE_URL
# Create SQLAlchemy engine
engine = create_engine(DATABASE_URL)
# Create a SQLAlchemy session
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create database tables
Base.metadata.create_all(bind=engine)
db = SessionLocal()
We will use Fastapi to create a REST API server and we will use mangum to create a serverless handler which you can use to deploy your serverless application on platforms like aws lambda.
Let’s first import required dependencies and create our first route in “main.py” file
from fastapi import FastAPI
# from app.api.api_v1.api import router as api_router
from mangum import Mangum
from agent import run_agent
from pydantic import BaseModel
from dbClient import db
from schema import Item
from logs import logs,reset_logs
app = FastAPI()
handler = Mangum(app,lifespan="off")
@app.get("/")
async def root():
return {"message": "Hello World!"}
We will store this information about any agent-generated email in our database:
id: A unique id for each entry or column
email: Email of lead
category: The email category given by our agent
logs: Email agent logs to track the generation process and actions it is taking to generate each email response
message: The lead’s reply for which the response was generated
generated_response: The final response generated by agent for given message
campaign_id: Campaign id of the cold email campaign from smartlead
status: Status of process ( it will be RUNNING or COMPLETED ) which can be used to filter out live agent processes.
Create a file called “schema.py” and add the schema for our database:
from sqlalchemy import Column, Integer, String, ARRAY
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Item(Base):
__tablename__ = "emails"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, index=True)
category = Column(String)
logs = Column(ARRAY(String))
generated_response = Column(String)
message = Column(String)
campaign_id = Column(Integer)
status = Column(String)
Creating Logs for our Agent
For every agent event, we will add some information in our logs array so that users can keep track of agent activity in real time. Let’s implement “add_to_logs” function which will add a new entry in current agent logs. Every log will be in form of string which will contain datetime and some information about that activity.
I have created a folder called “helpers” and in that folder I have created a file called “add_to_logs.py”
from sqlalchemy.orm import Session
from schema import Item
from dbClient import db
def add_to_logs(email: str, new_log: str):
# Retrieve the Item object from the database
item = db.query(Item).filter(Item.email == email).first()
print("Adding logs...", new_log)
print(item.logs)
if item:
item.logs = item.logs + [new_log]
# Update the Item object in the database
db.commit()
# db.refresh(item)
print("Updated logs",item.logs)
Let’s add our agent tools in a file called “tools.py” 🔨
from langchain.pydantic_v1 import BaseModel, Field
from typing import Type, List
from langchain.tools import BaseTool
import requests
import json
from constants import APOLLO_API_KEY, OPENAI_KEY
from openai import OpenAI as OpenAIPython
from logs import logs
from datetime import datetime
from helpers.addLogs import add_to_logs
client = OpenAIPython(
# This is the default and can be omitted
api_key=OPENAI_KEY,
)
class EmailWriterToolInput(BaseModel):
latest_reply: str = Field(description="Latest reply from the prospect")
conversation_history: str = Field(description="Array of conversation history")
sender: str = Field(description="Name of sender")
company_info: str = Field(description="Information about sender's company")
email: str = Field(description="lead email")
class EmailWriterTool(BaseTool):
name = "email_writer_tool"
description = "use this tool when you have given a email and you have to construct a reply for it"
args_schema: Type[BaseModel] = EmailWriterToolInput
def _run(self, latest_reply: str, conversation_history: str, sender: str,company_info: str, email: str):
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 📝 Generating Email Response...")
print("📝 Generating Email Response...")
headers = {
"Content-Type": "application/json"
}
data = {
"params": {
"client_email": latest_reply,
"sender":sender,
"conversation_history":conversation_history,
"company_description":company_info
},
"project": "2b9xxxxx-xxxx-xxxxxx-xxx"
}
res = requests.post("https://api-f1db6c.stack.tryrelevance.com/latest/studios/xxxx-xxxxxx-xxxxx/trigger_limited",data=json.dumps(data),headers=headers)
res = res.json()
return res["output"]["answer"]
def _arun(self, url: str):
raise NotImplementedError(
"email writer tool does not support async")
class CompanySearchToolInput(BaseModel):
email: str = Field(description="Email of sender")
category: str = Field(description="Category of email")
class CompanySearchTool(BaseTool):
name = "company_search_tool"
description = "use this tool when you want to get information about any company"
args_schema: Type[BaseModel] = CompanySearchToolInput
def _run(self, email: str, category: str):
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 🔍 searching about company information...")
print("🔍 searching about company information...")
#
data = {
"api_key":APOLLO_API_KEY,
"email":email
}
response = requests.post(f"https://api.apollo.io/v1/people/match",data=data)
response = response.json()
if "organization" in response["person"] and response["person"]["organization"] is not None and response["person"]["organization"]["short_description"] is not None:
return response["person"]["organization"]["short_description"]
else:
return "No data"
def _arun(self, url: str):
raise NotImplementedError(
"categorise_email does not support async")
class CategorizeEmailInput(BaseModel):
conversation: str = Field(description="Email conversation array")
email:str = Field(description="Lead email")
class CategorizeEmailTool(BaseTool):
name = "email_categorizer_tool"
description = "use this tool when have email conversation history and you want to categorize this email"
args_schema: Type[BaseModel] = CategorizeEmailInput
def _run(self, conversation: str,email: str):
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 📩 Categorizing Email...")
print("📩 Categorizing Email...")
prompt = f"""
Email Conversation History:
---
{conversation}
---
You have given an array of conversation between Rohan Sawant and a client
Your goal is to categorize this email based on the conversation history from the given categories:
1. Meeting_Ready_Lead: they have shown positive intent and are interested in getting on a call
2. Power: If they’re interested and we want to push for a call
3. Question: If they have any question regarding anything
4. Unsubscribe: They want to unsubscribe themselves from our email list
5. OOO: They are out of office
6. No_Longer_Works: They no longer works in the company
7. Not_Interested: They are not interested
8. Info: these are emails that don't fit into any of the above categories.
Note: Your final response MUST BE the category name ONLY
RESPONSE:
"""
message = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": prompt}
]
)
category = message.choices[0].message.content
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: Email successfully categorized as {category}")
print(f"Email successfully categorized as {category}")
return category
def _arun(self):
raise NotImplementedError(
"categorise_email does not support async")
Creating API Routes
Now we have completed the initial setup, it’s time to setup our API routes which we discussed earlier.
Let’s create a route to run our agent which will take email id and campaign id as a body. We will pass this information to our agent and run it 🤖
class Email_Body(BaseModel):
email: str
campaign_id: int
@app.post("/generate-email")
async def root(item: Email_Body):
# Create entry in DB
email_data = Item(email=item.email,category="",logs=[],generated_response="",message="",campaign_id=item.campaign_id,status="RUNNING")
db.add(email_data)
db.commit()
db.refresh(email_data)
# Run agent and get the response
response = run_agent(item.campaign_id,item.email)
# Add agent response in DB
item = db.query(Item).filter(Item.email == item.email).first()
if item:
item.category = response['response']['category']
item.generated_response = response['response']['reply']
item.message = response['message']
item.status = "COMPLETED"
# Update the Item object in the database
db.commit()
db.refresh(item)
return response
Let’s create a route to get all the previously generated emails so that we can show it in our streamlit application.
@app.get("/get-emails")
async def root():
records = db.query(Item).all()
records_json = [record.__dict__ for record in records]
return records_json
And finally, create a route to get the live sessions of agent so that user can track the agent activity in real time.
@app.get("/get-live-logs")
async def root():
item = db.query(Item).filter(Item.status == "RUNNING")
records_json = [record.__dict__ for record in item]
return records_json
And we have successfully created a server 🚀! You can host this code on any serverless platform like aws lambda or use it locally for personal use.
Streamlit User Interface
Now let’s build a user interface where user can see all the previously generated emails and live activity of agent if it’s running. We will use streamlit to create this user interface in which we will create 2 pages: 1) History 2) Live Logs. So let’s get started!
Creating The Interface
First create a python project and install required dependencies
!pip install streamlit requests
Create a file called “main.py” and add below code:
import streamlit as st
import requests
live_data = []
# Define a function to display logs when the dropdown is clicked
def show_logs(logs):
for log in logs:
st.markdown(f'''
```
{log}
```
''')
def main():
global live_data
# Custom CSS for some labels and categories
custom_css = """
"""
st.markdown(custom_css, unsafe_allow_html=True)
# Title and description
with st.sidebar:
selected_tab = st.sidebar.selectbox("Navigation", ["Live Logs", "History"], index=0)
if selected_tab == "Live Logs":
# Display live logs here
elif selected_tab == "History":
# Display History here
if __name__ == "__main__":
main()
Creating Live Agent Logs Page
We will create one page to show live agent activity where user can see all the instances of our agent in real time and track the agent activity.
We will make a call to “/get-live-logs” API endpoint which we just created in our server to get the agent instances which are running.
Let’s write a code to show live agent logs, Add the below code in first if condition
st.title("Live Agent Logs")
st.write("Here you can see the live logs from running agent")
# Get the live logs from server
response = requests.get("http://yourserver.com/get-live-logs")
live_data = response.json()
refresh_button_placeholder = st.empty()
refresh_button_placeholder.button("Refresh", on_click=refresh_card)
# Display cards
for i, card in enumerate(live_data):
with st.container(border=True):
st.subheader("Email: {}".format(card["email"]))
# Add refresh button
st.markdown(f"""
Status:
{card['status']}
""", unsafe_allow_html=True)
st.write("Category")
st.markdown(f"""
```
{card["category"]}
""")
# Display email and reply
st.write("Message:")
st.markdown(f"""
```
{card["message"]}
""")
st.write("Generated Response:")
st.markdown(f"""
```
{card["generated_response"] if card["generated_response"] != "" else "No Response"}
""")
# Whenever user clicks on logs expander, we will show them logs
with st.expander("Logs"):
show_logs(card["logs"])
We will also add a refresh button in the live logs cards so that user can click refresh to get the latest logs so let’s write the code for that too!
def refresh_card():
global live_data
response = requests.get("https://yourserver.com/beta/get-live-logs")
live_data = response.json()
From sidebar, you can select live logs page and see the live agent logs which looks like this
Creating Email History Page
We will create one more page to show all the previously generated email responses. Here we will use “/get-emails” API endpoint we created in our server.
Now let’s write a code to show previously generated emails, Add the below code in else condition:
st.title("Cold Email Agent")
st.write("An autonomous agent which can reply to your cold emails in your tone")
# Get previous email
response = requests.get("https://yourserver.com/beta/get-emails")
response = response.json()
# Display cards
for i, card in enumerate(response):
with st.container(border=True):
st.subheader("Email: {}".format(card["email"]))
# Add refresh button
st.markdown(f"""
Status:
{card['status']}
""", unsafe_allow_html=True)
st.write("Category:")
st.markdown(f"""
```
{card["category"]}
""")
# Display email and reply
st.write("Message:")
st.markdown(f"""
```
{card["message"]}
""")
st.write("Generated Response:")
st.markdown(f"""
```
{card["generated_response"] if card["generated_response"] != "" else "No Response"}
""")
with st.expander("Logs"):
show_logs(card["logs"])
And now you can see the previously generated emails in user interface
You can also see the logs after clicking the “logs” button
And we have successfully built the both frontend and backend for our workflow 🎉!
Now it’s time to combine the building blocks using Zapier and we will be ready to use our agent!
Zapier Integration
Let’s first setup our first zap where we will capture the webhook event coming from smartlead. Here are the initial steps you need to perform to setup the Zapier webhooks.
Step-1: Create an account on Zapier and create a new zap.
Step-2: Click on “Add Action” and search “Webhooks by Zapier” and select it
Step-3: Select “Catch Hook” as an event and create a webhook URL which will be your webhook catch URL that can be passed as a webhook URL in smartlead settings.
Step-4: Go to your smartlead dashboard and go to settings → webhooks and click on “Add Webhook” to create a new webhook.
Step-5: Give your webhook a name and add the Zapier catch URL in webhook URL field. Select User as an association type so that your webhook runs for every lead.
Step-6: Smartlead provides webhooks for different kind of events like “Email Reply”, “Email click”, “Email sent” etc but here we want to run our agent when we get a reply so we will select “Email Reply” as an event.
The webhook response for “Email Reply” event will look like this:
",
"text": "##- Please type your reply above this line -##",
"time": "2023-04-04T08:31:22+00:00"
},
"campaign_name": "Link insertion",
"campaign_id": 100,
"client_id": null,
"sequence_number": 1,
"secret_key": "secretkey",
"app_url": "https://app.smartlead.ai/app",
"ui_master_inbox_link": "https://app.smartlead.ai/app",
"description": "support@test.com replied to Email 1 for campaign - Link insertion ",
"metadata": {
"webhook_created_at": "2023-09-26T10:48:56.598Z"
},
"webhook_url": "https://webhook.site/5168fa7f-0asd-465a-8114-111da474a77",
"event_type": "EMAIL_REPLY"
}
This is how your final webhook config will look like 👇
Step-7: To test the webhook, click on “Send Test To Webhook” Button and on Zapier dashboard, click on the step we just created and go to test section and click on “Test webhook” and you will see the information coming from smartlead webhooks in your zapier catch webhook.
Step-8: Now finally we will trigger our API after this action. click on add step and again select “Webhooks by Zapier” and select “POST” as an event. Click on continue.
Step-9: Add your server URL in URL field and specify the body data which you want to send to server. In our case we are going to send lead email and campaign id as we discussed before.
Click on continue and we have finally completed the Zapier configuration! ⚡
Now whenever any lead will reply to your email the api will be triggered and it will run your agent and all your generated email replies will be stored in our PostgreSQL database which you can see from the streamlit dashboard we just created.
So now it looks like an actual autonomous agent which can classify your emails, get information about leads, generate replies to your emails in your own style and tone and also can reply to leads using smartlead API 🚀!
Conclusion
As we discussed at starting, the agent which we created in part 1 was not properly autonomous because it still required human interaction to run and stop it. So we created a server to run our agent which will be triggered by smartlead webhooks to run it automatically whenever any lead replies to your email. We also created a beautiful user interface to track the previously generated emails and the real time agent activity.
So whether you are a small business or an individual who uses cold email to build connections and networks then this agent workflow will be best for you because it will do everything for you from classifying an email to replying to your leads in your tone and writing style. Isn’t it looking like a miracle? 👀.
Want to Automate Your Workflows?
So, whether you are a small team looking for clients, a job seeker looking for better opportunities, a freelancer looking for clients or a large organization seeking for more clients then cold emails are one of the best ways to get more connections and AI agents can automate this work for you.
If you are looking to build custom AI agents to automate your workflows like this then kindly book a call with us and we will be happy to convert your ideas into reality to make your life easy.
Thanks for reading 😄.
Create Fast — Impress Instantly
Transform Your Business with AI Solutions
Ready to scale with AI? Get a personalized strategy session with our AI experts. Discover how to automate, optimize, and accelerate your business growth.
Looking to build AI solutions? Let's chat. Schedule your consultation today - this not a sales call, feel free to come prepared with your technical queries. You'll be meeting Rohan Sawant, the Founder.
In part 1 of creating an AI agent to manage your inbox and cold emails, we saw how we can create an AI agent using Langchain and it works perfectly but it only runs when you run the code and it also replies in CLI. Is it really autonomous? 🤔 I don’t think so because it still requires human interaction to run the agent and we don’t have a storage where we can see all the generated email replies.
What if we can run our agent whenever we receive a reply on our cold email and it will automatically classify and generate the response and then we can see all the generated replies in a beautiful UI. That will make it look more like an agent right? 👀 so let’s do it!
Workflow
Smartlead provides webhooks that will be triggered on different type of events like email reply, email sent, email open etc. We can use these webhooks to run our agent on a specific event. We will use Zapier to create a webhook catch URL which will be triggered by Smartlead webhooks and we will run our agents using Zapier.
We can’t directly trigger the agent python code from Zapier so will create a serverless in which we will create some API routes for different purposes like /generate-response, /get-live-logs or /get-history. You can also use a server for this but serverless will be a bit faster and easy to manage as we only have 2-3 routes.
We will use PostgreSQL to store all the previously generated emails so that users can keep track of past emails and we will use Streamlit to create a user interface to show previously generated email responses and live agent logs coming from our serverless.
Here is how the new workflow will look like 👇
Let’s discuss how we can make it in detail!
Creating Backend Serverless
First of all, let’s create our backend where we will run our agent and interact with database. Basically we will create 3 routes:
/generate-email: To run our agent and generate email response if needed
/get-emails: To get previously generated emails from postgreSQL database
/get-live-logs: To get the live logs of agent if it is running for any email
Create a file called “constants.py” and store your environment variables in it
SMARTLEAD_API_KEY = "d1eefe52-xxxx-xxx-xxxxxx"
OPENAI_KEY = "sk-xxxxxxx"
APOLLO_API_KEY = "Gxxxxxx"
# I am using local postgreSQL server
DATABASE_URL = "postgresql://postgres:shivam@localhost:5432/coldemail_fastapi"
Setting up Database Connection
Now let’s setup our database connection. We will use sqlalchemy to connect our PostgreSQL DB with server.
Create a file called “dbClient.py” and add this code in it:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from schema import Base, Item
from constants import DATABASE_URL
# Create SQLAlchemy engine
engine = create_engine(DATABASE_URL)
# Create a SQLAlchemy session
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create database tables
Base.metadata.create_all(bind=engine)
db = SessionLocal()
We will use Fastapi to create a REST API server and we will use mangum to create a serverless handler which you can use to deploy your serverless application on platforms like aws lambda.
Let’s first import required dependencies and create our first route in “main.py” file
from fastapi import FastAPI
# from app.api.api_v1.api import router as api_router
from mangum import Mangum
from agent import run_agent
from pydantic import BaseModel
from dbClient import db
from schema import Item
from logs import logs,reset_logs
app = FastAPI()
handler = Mangum(app,lifespan="off")
@app.get("/")
async def root():
return {"message": "Hello World!"}
We will store this information about any agent-generated email in our database:
id: A unique id for each entry or column
email: Email of lead
category: The email category given by our agent
logs: Email agent logs to track the generation process and actions it is taking to generate each email response
message: The lead’s reply for which the response was generated
generated_response: The final response generated by agent for given message
campaign_id: Campaign id of the cold email campaign from smartlead
status: Status of process ( it will be RUNNING or COMPLETED ) which can be used to filter out live agent processes.
Create a file called “schema.py” and add the schema for our database:
from sqlalchemy import Column, Integer, String, ARRAY
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Item(Base):
__tablename__ = "emails"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, index=True)
category = Column(String)
logs = Column(ARRAY(String))
generated_response = Column(String)
message = Column(String)
campaign_id = Column(Integer)
status = Column(String)
Creating Logs for our Agent
For every agent event, we will add some information in our logs array so that users can keep track of agent activity in real time. Let’s implement “add_to_logs” function which will add a new entry in current agent logs. Every log will be in form of string which will contain datetime and some information about that activity.
I have created a folder called “helpers” and in that folder I have created a file called “add_to_logs.py”
from sqlalchemy.orm import Session
from schema import Item
from dbClient import db
def add_to_logs(email: str, new_log: str):
# Retrieve the Item object from the database
item = db.query(Item).filter(Item.email == email).first()
print("Adding logs...", new_log)
print(item.logs)
if item:
item.logs = item.logs + [new_log]
# Update the Item object in the database
db.commit()
# db.refresh(item)
print("Updated logs",item.logs)
Let’s add our agent tools in a file called “tools.py” 🔨
from langchain.pydantic_v1 import BaseModel, Field
from typing import Type, List
from langchain.tools import BaseTool
import requests
import json
from constants import APOLLO_API_KEY, OPENAI_KEY
from openai import OpenAI as OpenAIPython
from logs import logs
from datetime import datetime
from helpers.addLogs import add_to_logs
client = OpenAIPython(
# This is the default and can be omitted
api_key=OPENAI_KEY,
)
class EmailWriterToolInput(BaseModel):
latest_reply: str = Field(description="Latest reply from the prospect")
conversation_history: str = Field(description="Array of conversation history")
sender: str = Field(description="Name of sender")
company_info: str = Field(description="Information about sender's company")
email: str = Field(description="lead email")
class EmailWriterTool(BaseTool):
name = "email_writer_tool"
description = "use this tool when you have given a email and you have to construct a reply for it"
args_schema: Type[BaseModel] = EmailWriterToolInput
def _run(self, latest_reply: str, conversation_history: str, sender: str,company_info: str, email: str):
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 📝 Generating Email Response...")
print("📝 Generating Email Response...")
headers = {
"Content-Type": "application/json"
}
data = {
"params": {
"client_email": latest_reply,
"sender":sender,
"conversation_history":conversation_history,
"company_description":company_info
},
"project": "2b9xxxxx-xxxx-xxxxxx-xxx"
}
res = requests.post("https://api-f1db6c.stack.tryrelevance.com/latest/studios/xxxx-xxxxxx-xxxxx/trigger_limited",data=json.dumps(data),headers=headers)
res = res.json()
return res["output"]["answer"]
def _arun(self, url: str):
raise NotImplementedError(
"email writer tool does not support async")
class CompanySearchToolInput(BaseModel):
email: str = Field(description="Email of sender")
category: str = Field(description="Category of email")
class CompanySearchTool(BaseTool):
name = "company_search_tool"
description = "use this tool when you want to get information about any company"
args_schema: Type[BaseModel] = CompanySearchToolInput
def _run(self, email: str, category: str):
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 🔍 searching about company information...")
print("🔍 searching about company information...")
#
data = {
"api_key":APOLLO_API_KEY,
"email":email
}
response = requests.post(f"https://api.apollo.io/v1/people/match",data=data)
response = response.json()
if "organization" in response["person"] and response["person"]["organization"] is not None and response["person"]["organization"]["short_description"] is not None:
return response["person"]["organization"]["short_description"]
else:
return "No data"
def _arun(self, url: str):
raise NotImplementedError(
"categorise_email does not support async")
class CategorizeEmailInput(BaseModel):
conversation: str = Field(description="Email conversation array")
email:str = Field(description="Lead email")
class CategorizeEmailTool(BaseTool):
name = "email_categorizer_tool"
description = "use this tool when have email conversation history and you want to categorize this email"
args_schema: Type[BaseModel] = CategorizeEmailInput
def _run(self, conversation: str,email: str):
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: 📩 Categorizing Email...")
print("📩 Categorizing Email...")
prompt = f"""
Email Conversation History:
---
{conversation}
---
You have given an array of conversation between Rohan Sawant and a client
Your goal is to categorize this email based on the conversation history from the given categories:
1. Meeting_Ready_Lead: they have shown positive intent and are interested in getting on a call
2. Power: If they’re interested and we want to push for a call
3. Question: If they have any question regarding anything
4. Unsubscribe: They want to unsubscribe themselves from our email list
5. OOO: They are out of office
6. No_Longer_Works: They no longer works in the company
7. Not_Interested: They are not interested
8. Info: these are emails that don't fit into any of the above categories.
Note: Your final response MUST BE the category name ONLY
RESPONSE:
"""
message = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": prompt}
]
)
category = message.choices[0].message.content
add_to_logs(email=email,new_log=f"{datetime.now().strftime('%y-%m-%d %H:%M:%S')}: Email successfully categorized as {category}")
print(f"Email successfully categorized as {category}")
return category
def _arun(self):
raise NotImplementedError(
"categorise_email does not support async")
Creating API Routes
Now we have completed the initial setup, it’s time to setup our API routes which we discussed earlier.
Let’s create a route to run our agent which will take email id and campaign id as a body. We will pass this information to our agent and run it 🤖
class Email_Body(BaseModel):
email: str
campaign_id: int
@app.post("/generate-email")
async def root(item: Email_Body):
# Create entry in DB
email_data = Item(email=item.email,category="",logs=[],generated_response="",message="",campaign_id=item.campaign_id,status="RUNNING")
db.add(email_data)
db.commit()
db.refresh(email_data)
# Run agent and get the response
response = run_agent(item.campaign_id,item.email)
# Add agent response in DB
item = db.query(Item).filter(Item.email == item.email).first()
if item:
item.category = response['response']['category']
item.generated_response = response['response']['reply']
item.message = response['message']
item.status = "COMPLETED"
# Update the Item object in the database
db.commit()
db.refresh(item)
return response
Let’s create a route to get all the previously generated emails so that we can show it in our streamlit application.
@app.get("/get-emails")
async def root():
records = db.query(Item).all()
records_json = [record.__dict__ for record in records]
return records_json
And finally, create a route to get the live sessions of agent so that user can track the agent activity in real time.
@app.get("/get-live-logs")
async def root():
item = db.query(Item).filter(Item.status == "RUNNING")
records_json = [record.__dict__ for record in item]
return records_json
And we have successfully created a server 🚀! You can host this code on any serverless platform like aws lambda or use it locally for personal use.
Streamlit User Interface
Now let’s build a user interface where user can see all the previously generated emails and live activity of agent if it’s running. We will use streamlit to create this user interface in which we will create 2 pages: 1) History 2) Live Logs. So let’s get started!
Creating The Interface
First create a python project and install required dependencies
!pip install streamlit requests
Create a file called “main.py” and add below code:
import streamlit as st
import requests
live_data = []
# Define a function to display logs when the dropdown is clicked
def show_logs(logs):
for log in logs:
st.markdown(f'''
```
{log}
```
''')
def main():
global live_data
# Custom CSS for some labels and categories
custom_css = """
"""
st.markdown(custom_css, unsafe_allow_html=True)
# Title and description
with st.sidebar:
selected_tab = st.sidebar.selectbox("Navigation", ["Live Logs", "History"], index=0)
if selected_tab == "Live Logs":
# Display live logs here
elif selected_tab == "History":
# Display History here
if __name__ == "__main__":
main()
Creating Live Agent Logs Page
We will create one page to show live agent activity where user can see all the instances of our agent in real time and track the agent activity.
We will make a call to “/get-live-logs” API endpoint which we just created in our server to get the agent instances which are running.
Let’s write a code to show live agent logs, Add the below code in first if condition
st.title("Live Agent Logs")
st.write("Here you can see the live logs from running agent")
# Get the live logs from server
response = requests.get("http://yourserver.com/get-live-logs")
live_data = response.json()
refresh_button_placeholder = st.empty()
refresh_button_placeholder.button("Refresh", on_click=refresh_card)
# Display cards
for i, card in enumerate(live_data):
with st.container(border=True):
st.subheader("Email: {}".format(card["email"]))
# Add refresh button
st.markdown(f"""
Status:
{card['status']}
""", unsafe_allow_html=True)
st.write("Category")
st.markdown(f"""
```
{card["category"]}
""")
# Display email and reply
st.write("Message:")
st.markdown(f"""
```
{card["message"]}
""")
st.write("Generated Response:")
st.markdown(f"""
```
{card["generated_response"] if card["generated_response"] != "" else "No Response"}
""")
# Whenever user clicks on logs expander, we will show them logs
with st.expander("Logs"):
show_logs(card["logs"])
We will also add a refresh button in the live logs cards so that user can click refresh to get the latest logs so let’s write the code for that too!
def refresh_card():
global live_data
response = requests.get("https://yourserver.com/beta/get-live-logs")
live_data = response.json()
From sidebar, you can select live logs page and see the live agent logs which looks like this
Creating Email History Page
We will create one more page to show all the previously generated email responses. Here we will use “/get-emails” API endpoint we created in our server.
Now let’s write a code to show previously generated emails, Add the below code in else condition:
st.title("Cold Email Agent")
st.write("An autonomous agent which can reply to your cold emails in your tone")
# Get previous email
response = requests.get("https://yourserver.com/beta/get-emails")
response = response.json()
# Display cards
for i, card in enumerate(response):
with st.container(border=True):
st.subheader("Email: {}".format(card["email"]))
# Add refresh button
st.markdown(f"""
Status:
{card['status']}
""", unsafe_allow_html=True)
st.write("Category:")
st.markdown(f"""
```
{card["category"]}
""")
# Display email and reply
st.write("Message:")
st.markdown(f"""
```
{card["message"]}
""")
st.write("Generated Response:")
st.markdown(f"""
```
{card["generated_response"] if card["generated_response"] != "" else "No Response"}
""")
with st.expander("Logs"):
show_logs(card["logs"])
And now you can see the previously generated emails in user interface
You can also see the logs after clicking the “logs” button
And we have successfully built the both frontend and backend for our workflow 🎉!
Now it’s time to combine the building blocks using Zapier and we will be ready to use our agent!
Zapier Integration
Let’s first setup our first zap where we will capture the webhook event coming from smartlead. Here are the initial steps you need to perform to setup the Zapier webhooks.
Step-1: Create an account on Zapier and create a new zap.
Step-2: Click on “Add Action” and search “Webhooks by Zapier” and select it
Step-3: Select “Catch Hook” as an event and create a webhook URL which will be your webhook catch URL that can be passed as a webhook URL in smartlead settings.
Step-4: Go to your smartlead dashboard and go to settings → webhooks and click on “Add Webhook” to create a new webhook.
Step-5: Give your webhook a name and add the Zapier catch URL in webhook URL field. Select User as an association type so that your webhook runs for every lead.
Step-6: Smartlead provides webhooks for different kind of events like “Email Reply”, “Email click”, “Email sent” etc but here we want to run our agent when we get a reply so we will select “Email Reply” as an event.
The webhook response for “Email Reply” event will look like this:
",
"text": "##- Please type your reply above this line -##",
"time": "2023-04-04T08:31:22+00:00"
},
"campaign_name": "Link insertion",
"campaign_id": 100,
"client_id": null,
"sequence_number": 1,
"secret_key": "secretkey",
"app_url": "https://app.smartlead.ai/app",
"ui_master_inbox_link": "https://app.smartlead.ai/app",
"description": "support@test.com replied to Email 1 for campaign - Link insertion ",
"metadata": {
"webhook_created_at": "2023-09-26T10:48:56.598Z"
},
"webhook_url": "https://webhook.site/5168fa7f-0asd-465a-8114-111da474a77",
"event_type": "EMAIL_REPLY"
}
This is how your final webhook config will look like 👇
Step-7: To test the webhook, click on “Send Test To Webhook” Button and on Zapier dashboard, click on the step we just created and go to test section and click on “Test webhook” and you will see the information coming from smartlead webhooks in your zapier catch webhook.
Step-8: Now finally we will trigger our API after this action. click on add step and again select “Webhooks by Zapier” and select “POST” as an event. Click on continue.
Step-9: Add your server URL in URL field and specify the body data which you want to send to server. In our case we are going to send lead email and campaign id as we discussed before.
Click on continue and we have finally completed the Zapier configuration! ⚡
Now whenever any lead will reply to your email the api will be triggered and it will run your agent and all your generated email replies will be stored in our PostgreSQL database which you can see from the streamlit dashboard we just created.
So now it looks like an actual autonomous agent which can classify your emails, get information about leads, generate replies to your emails in your own style and tone and also can reply to leads using smartlead API 🚀!
Conclusion
As we discussed at starting, the agent which we created in part 1 was not properly autonomous because it still required human interaction to run and stop it. So we created a server to run our agent which will be triggered by smartlead webhooks to run it automatically whenever any lead replies to your email. We also created a beautiful user interface to track the previously generated emails and the real time agent activity.
So whether you are a small business or an individual who uses cold email to build connections and networks then this agent workflow will be best for you because it will do everything for you from classifying an email to replying to your leads in your tone and writing style. Isn’t it looking like a miracle? 👀.
Want to Automate Your Workflows?
So, whether you are a small team looking for clients, a job seeker looking for better opportunities, a freelancer looking for clients or a large organization seeking for more clients then cold emails are one of the best ways to get more connections and AI agents can automate this work for you.
If you are looking to build custom AI agents to automate your workflows like this then kindly book a call with us and we will be happy to convert your ideas into reality to make your life easy.