orchestration¶
Eino-based workflow orchestration for multi-step agent workflows.
Graph Builder¶
Create type-safe workflow graphs:
import (
"github.com/cloudwego/eino/compose"
"github.com/plexusone/agentkit/orchestration"
)
type Input struct {
Query string
}
type Output struct {
Result string
}
// Create builder
builder := orchestration.NewGraphBuilder[*Input, *Output]("my-workflow")
graph := builder.Graph()
Adding Nodes¶
Use Eino's InvokableLambda for processing nodes:
// Processing node
processLambda := compose.InvokableLambda(func(ctx context.Context, input *Input) (*Intermediate, error) {
return &Intermediate{Data: input.Query}, nil
})
graph.AddLambdaNode("process", processLambda)
// Formatting node
formatLambda := compose.InvokableLambda(func(ctx context.Context, data *Intermediate) (*Output, error) {
return &Output{Result: data.Data}, nil
})
graph.AddLambdaNode("format", formatLambda)
Connecting Nodes¶
// Start -> process
builder.AddStartEdge("process")
// process -> format
builder.AddEdge("process", "format")
// format -> End
builder.AddEndEdge("format")
Building and Executing¶
// Build the graph
finalGraph := builder.Build()
// Create executor
executor := orchestration.NewExecutor(finalGraph, "my-workflow")
// Execute
result, err := executor.Execute(ctx, &Input{Query: "test"})
if err != nil {
log.Fatal(err)
}
log.Printf("Result: %s", result.Result)
HTTP Handler¶
Expose workflows as HTTP endpoints:
executor := orchestration.NewExecutor(graph, "my-workflow")
// Create HTTP handler (handles JSON encode/decode, errors)
handler := orchestration.NewHTTPHandler(executor)
// Use with httpserver
server, _ := httpserver.NewBuilder("my-agent", 8001).
WithHandler("/workflow", handler).
Build()
Agent Caller¶
Call other agents from within workflows:
import "github.com/plexusone/agentkit/orchestration"
caller := orchestration.NewAgentCaller(httpClient)
// Call another agent
response, err := caller.Call(ctx, "http://research-agent:8001/research", request)
Multi-Step Workflow Example¶
type ResearchInput struct {
Topic string
}
type ResearchOutput struct {
Summary string
Sources []string
}
type IntermediateState struct {
Topic string
Findings []string
Verified []string
}
func buildWorkflow() *orchestration.Executor[*ResearchInput, *ResearchOutput] {
builder := orchestration.NewGraphBuilder[*ResearchInput, *ResearchOutput]("research")
graph := builder.Graph()
// Research step
researchLambda := compose.InvokableLambda(func(ctx context.Context, input *ResearchInput) (*IntermediateState, error) {
// Call research agent
findings := doResearch(ctx, input.Topic)
return &IntermediateState{
Topic: input.Topic,
Findings: findings,
}, nil
})
graph.AddLambdaNode("research", researchLambda)
// Verification step
verifyLambda := compose.InvokableLambda(func(ctx context.Context, state *IntermediateState) (*IntermediateState, error) {
// Verify findings
verified := verifyFindings(ctx, state.Findings)
state.Verified = verified
return state, nil
})
graph.AddLambdaNode("verify", verifyLambda)
// Synthesis step
synthesizeLambda := compose.InvokableLambda(func(ctx context.Context, state *IntermediateState) (*ResearchOutput, error) {
summary := synthesize(ctx, state.Verified)
return &ResearchOutput{
Summary: summary,
Sources: state.Verified,
}, nil
})
graph.AddLambdaNode("synthesize", synthesizeLambda)
// Connect
builder.AddStartEdge("research")
builder.AddEdge("research", "verify")
builder.AddEdge("verify", "synthesize")
builder.AddEndEdge("synthesize")
return orchestration.NewExecutor(builder.Build(), "research")
}
Conditional Branching¶
// Add branch node
graph.AddBranch("router", func(ctx context.Context, input *Input) (string, error) {
if input.NeedsVerification {
return "verify", nil
}
return "synthesize", nil
})
builder.AddStartEdge("router")
builder.AddEdge("router", "verify")
builder.AddEdge("router", "synthesize")
builder.AddEdge("verify", "synthesize")
builder.AddEndEdge("synthesize")
Error Handling¶
Errors propagate through the workflow: