from agno.workflow import Workflow, OnReject
from agno.workflow.condition import Condition
from agno.workflow.step import Step
from agno.workflow.types import HumanReview, StepInput, StepOutput
from agno.db.sqlite import SqliteDb
def detailed_analysis(step_input: StepInput) -> StepOutput:
return StepOutput(content="Detailed analysis: Full review completed")
def quick_summary(step_input: StepInput) -> StepOutput:
return StepOutput(content="Quick summary: Key highlights identified")
workflow = Workflow(
name="analysis_workflow",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(name="analyze", executor=analyze_data),
Condition(
name="analysis_depth",
steps=[Step(name="detailed", executor=detailed_analysis)],
else_steps=[Step(name="quick", executor=quick_summary)],
human_review=HumanReview(
requires_confirmation=True,
confirmation_message="Perform detailed analysis?",
on_reject=OnReject.else_branch,
),
),
Step(name="report", executor=generate_report),
],
)
run_output = workflow.run("Analyze Q4 data")
if run_output.is_paused:
for req in run_output.steps_requiring_confirmation:
print(f"Decision: {req.step_name}")
print(f"Message: {req.confirmation_message}")
if input("Confirm? (y/n): ").lower() == "y":
req.confirm()
print("Executing 'if' branch")
else:
req.reject()
print("Executing 'else' branch")
run_output = workflow.continue_run(run_output)
print(run_output.content)