[Cell 001]
!pip install -U crewai
[Cell 002]
from google.colab import userdata
import os
os.environ["GOOGLE_API_KEY"] = userdata.get('GEMINI_API_KEY_006')
os.environ["MODEL"] = "gemini/gemini-3.1-flash-lite"
#EVEN THE BELOW WILL ALSO WORK
# from google.colab import userdata
# # This sets it for both Python AND any !shell commands you run later
# %env GEMINI_API_KEY={userdata.get('GEMINI_API_KEY_006')}
## %env MODEL=gemini/gemini-3.1-flash-lite #if model is required in env
[Cell 003]
#!/usr/bin/env python
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class HelloWorld():
"""HelloWorld crew"""
agents: list[BaseAgent]
tasks: list[Task]
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
return Agent(
#config=self.agents_config['researcher'], # type: ignore[index]
role="{topic} Senior Data Researcher",
goal="Uncover cutting-edge developments in {topic}",
backstory="""You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.""",
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
#config=self.agents_config['reporting_analyst'], # type: ignore[index]
role="{topic} Reporting Analyst",
goal="Create detailed reports based on {topic} data analysis and research findings",
backstory="""You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.""",
verbose=True
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
#config=self.tasks_config['research_task'], # type: ignore[index]
description="""Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is {current_year}.
""",
expected_output="""A list with 10 bullet points of the most relevant information about {topic}""",
#agent="researcher"
agent=self.researcher()
)
@task
def reporting_task(self) -> Task:
return Task(
#config=self.tasks_config['reporting_task'], # type: ignore[index]
description="""Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.""",
expected_output="""A fully fledged report with the main topics, each with a full section of information.
Formatted as markdown without '```'""",
#agent="reporting_analyst",
agent=self.reporting_analyst(),
output_file='report.md'
)
@crew
def crew(self) -> Crew:
"""Creates the HelloWorld crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
return Crew(
#agents=self.agents, # Automatically created by the @agent decorator
#tasks=self.tasks, # Automatically created by the @task decorator
agents=[self.researcher(), self.reporting_analyst()],
tasks=[self.research_task(), self.reporting_task()],
process=Process.sequential,
verbose=True
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)
import sys
import warnings
from datetime import datetime
#from hello_world.crew import HelloWorld
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
# This main file is intended to be a way for you to run your
# crew locally, so refrain from adding unnecessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information
def run():
"""
Run the crew.
"""
inputs = {
'topic': 'AI LLMs',
'current_year': str(datetime.now().year)
}
#print(inputs)
#print(type(inputs))
try:
#HelloWorld().crew().kickoff(inputs={"topic" : "AI LLMs", "current_year" : 2026})
HelloWorld().crew().kickoff(inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while running the crew: {e}")
def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {
"topic": "AI LLMs",
'current_year': str(datetime.now().year)
}
try:
HelloWorld().crew().train(n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")
def replay():
"""
Replay the crew execution from a specific task.
"""
try:
HelloWorld().crew().replay(task_id=sys.argv[1])
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
def test():
"""
Test the crew execution and returns the results.
"""
inputs = {
"topic": "AI LLMs",
"current_year": str(datetime.now().year)
}
try:
HelloWorld().crew().test(n_iterations=int(sys.argv[1]), eval_llm=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while testing the crew: {e}")
def run_with_trigger():
"""
Run the crew with trigger payload.
"""
import json
if len(sys.argv) < 2:
raise Exception("No trigger payload provided. Please provide JSON payload as argument.")
try:
trigger_payload = json.loads(sys.argv[1])
except json.JSONDecodeError:
raise Exception("Invalid JSON payload provided as argument")
inputs = {
"crewai_trigger_payload": trigger_payload,
"topic": "",
"current_year": ""
}
try:
result = HelloWorld().crew().kickoff(inputs=inputs)
return result
except Exception as e:
raise Exception(f"An error occurred while running the crew with trigger: {e}")
run()
====================================================================================
OUTPUT
====================================================================================
No comments:
Post a Comment