actor

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2026 License: MIT Imports: 16 Imported by: 0

README

Actor 模型实现

这是一个用 Go 语言实现的高性能 Actor 模型库,提供了轻量级的并发编程模型。

特性

  • 高性能的 Actor 调度器
  • 类型安全的消息传递
  • 支持监督策略
  • 动态 Actor 生命周期管理
  • 内置邮箱实现,支持多种队列后端
  • Future/Promise 异步编程支持
  • 可扩展的注册表和分片注册表
  • 内置性能指标和监控

安装

go get github.com/adnilis/actor

快速开始

定义 Actor
type MyActor struct {
    // Actor 状态
    count int
}

// Receive 方法处理消息
func (a *MyActor) Receive(ctx actor.ActorContext) {
    switch msg := ctx.Message().(type) {
    case string:
        a.count++
        fmt.Printf("收到消息: %s, 计数: %d\n", msg, a.count)
    case int:
        a.count += msg
        fmt.Printf("收到数字: %d, 计数: %d\n", msg, a.count)
    }
}
创建 Actor 系统
import "github.com/adnilis/actor"

// 创建 Actor 系统
system, err := actor.NewActorSystem("my-system")
if err != nil {
    panic(err)
}
defer system.Shutdown(nil)

// 定义 Actor 配置
props := actor.NewProps(func() actor.Actor {
    return &MyActor{}
})

// 生成 Actor 引用
ref, err := system.Spawn(props)
if err != nil {
    panic(err)
}
发送消息
// 发送异步消息
ref.Tell("Hello Actor!")

// 发送请求并等待响应
future, err := ref.Ask("request", time.Second*5)
if err != nil {
    // 处理错误
}
result, err := future.Result()
if err != nil {
    // 处理错误
}

核心概念

Actor

Actor 是并发执行的基本单元,每个 Actor 都有自己的状态和消息处理逻辑。Actor 之间通过消息传递进行通信,避免了共享内存和锁的复杂性。

上下文 (Context)

Context 提供了 Actor 执行时的上下文信息,包括消息内容、发送者引用、Actor 引用等。

引用 (Ref)

Ref 是 Actor 的引用,用于向 Actor 发送消息。Ref 是线程安全的,可以在多个 Goroutine 之间共享。

邮箱 (Mailbox)

每个 Actor 都有一个邮箱,用于存储收到的消息。邮箱支持多种队列实现,包括:

  • 无锁环形队列 (goring)
  • MPSC 队列
  • 自定义队列实现
调度器 (Dispatcher)

调度器负责将邮箱中的消息分发给 Actor 进行处理。调度器使用 Goroutine 池来执行 Actor 的消息处理逻辑。

监督 (Supervision)

Actor 系统支持监督策略,可以定义 Actor 失败时的处理方式:

  • 重启 Actor
  • 停止 Actor
  • 向上级传递错误
  • 恢复执行

高级特性

Future/Promise

支持异步请求/响应模式:

future := ref.Ask("request", time.Second*5)

// 异步等待结果
future.OnComplete(func(result interface{}, err error) {
    if err != nil {
        // 处理错误
        return
    }
    // 处理结果
})

// 或者同步等待
result, err := future.Result()
分片注册表

对于大规模系统,可以使用分片注册表来提高性能:

registry := actor.NewShardedRegistry(16) // 16 个分片
system := actor.NewSystem("sharded-system", actor.WithRegistry(registry))
性能监控

内置性能指标收集:

// 启用指标收集
system := actor.NewSystem("monitored-system", actor.WithMetrics())

// 获取指标
metrics := system.Metrics()
fmt.Printf("处理的消息数: %d\n", metrics.MessagesProcessed)
fmt.Printf("平均处理时间: %v\n", metrics.AverageProcessingTime)

性能

该 Actor 实现经过高度优化,在现代硬件上表现出色。以下是在 Intel Core i9-14900HX 处理器上的实际基准测试结果:

Actor 核心性能
测试场景 每秒操作数 平均延迟 内存分配
单 Actor 消息发送 (CallingThreadDispatcher) 40,338,000 ops/s 30.0 ns/op 0 B/op, 0 allocs/op
单 Actor 消息发送 (Default Dispatcher) 8,003,000 ops/s 137 ns/op 63 B/op, 2 allocs/op
单 Actor 并行消息发送 3,176,000 ops/s 361 ns/op 56 B/op, 2 allocs/op
Context Ask 请求响应 7,245,000 ops/s 168 ns/op 88 B/op, 3 allocs/op
1000 Actor 并发 476,000 ops/s 2,692 ns/op 773 B/op, 8 allocs/op
Actor 创建/销毁 1,000,000 ops/s 1,057 ns/op 1,119 B/op, 18 allocs/op
队列性能
队列类型 测试场景 每秒操作数 平均延迟 内存分配
Goring 环形队列 单线程 Push+Pop 35,791,000 ops/s 32.5 ns/op 8 B/op, 0 allocs/op
Goring 环形队列 8线程并发生产 19,296,000 ops/s 68.4 ns/op 63 B/op, 0 allocs/op
MPSC 队列 单线程 Push+Pop 22,673,000 ops/s 51.8 ns/op 8 B/op, 0 allocs/op
MPSC 队列 8线程并发生产 5,395,000 ops/s 245 ns/op 24 B/op, 1 allocs/op
Worker Pool 性能(32 个工作线程)
测试场景 每秒操作数 平均延迟 内存分配
消息处理吞吐量 8,003,000 ops/s 137 ns/op 63 B/op, 2 allocs/op
并行任务处理 3,176,000 ops/s 361 ns/op 56 B/op, 2 allocs/op
请求响应模式 7,245,000 ops/s 168 ns/op 88 B/op, 3 allocs/op
关键特性
  • 极致性能: CallingThreadDispatcher 实现零分配、30纳秒级消息处理
  • 低延迟: 核心消息处理延迟仅 137 纳秒(默认调度器)
  • 高并发: 支持多核并行处理,并行发送可达 317 万 ops/s
  • 快速请求响应: Ask 操作仅需 168 纳秒
  • 高效队列: 无锁环形队列实现 3500 万 ops/s,MPSC 队列 2200 万 ops/s
  • Worker Pool 优化: 32个工作线程实现高度并行化处理
# 运行所有基准测试
go test -bench=. -benchmem ./tests/

# 仅运行队列基准测试
go test -bench="Queue" -benchmem ./tests/

# 仅运行Actor基准测试
go test -bench="BenchmarkActor" -benchmem ./tests/

# 仅运行Worker Pool基准测试
go test -bench="WorkerPool" -benchmem ./tests/

测试

运行测试:

go test ./...

运行基准测试:

go test -bench=.

许可证

MIT License - 详见 LICENSE 文件。

Documentation

Index

Constants

View Source
const (
	// NumRegistryBuckets 分片注册表的桶数量
	NumRegistryBuckets = 256
)

Variables

View Source
var (
	// ErrActorNotFound actor未找到错误
	ErrActorNotFound = errors.New("actor not found")
	// ErrActorStopped actor已停止错误
	ErrActorStopped = errors.New("actor stopped")

	// DeadLetters 死信actor引用
	DeadLetters ActorRef
	// NoSender 无发送者引用
	NoSender ActorRef
)
View Source
var (
	// ErrFutureTimeout 表示Future等待超时
	ErrFutureTimeout = errors.New("future: timeout waiting for result")
	// ErrFutureCancelled 表示Future被取消
	ErrFutureCancelled = errors.New("future: operation cancelled")
)
View Source
var (
	// ErrSystemNameTaken 系统名称已被占用
	ErrSystemNameTaken = errors.New("system name already taken")
	// ErrSystemNotFound 系统未找到
	ErrSystemNotFound = errors.New("system not found")
	// ErrPathExists 路径已存在
	ErrPathExists = errors.New("path already exists")
	// ErrParentNotExists 父路径不存在
	ErrParentNotExists = errors.New("parent path not exists")
	// ErrSystemShutdown 系统已关闭
	ErrSystemShutdown = errors.New("system is shutdown")
)

Functions

func ConfigureProfiling

func ConfigureProfiling(config ProfilingConfig)

ConfigureProfiling configures profiling based on environment variables or provided config

func DisableMetrics

func DisableMetrics()

DisableMetrics disables metrics collection

func EnableMetrics

func EnableMetrics()

EnableMetrics enables metrics collection

func ExtractRequestMessage

func ExtractRequestMessage(msg interface{}) (interface{}, string, bool)

ExtractRequestMessage 从Future请求消息中提取原始消息和correlation ID 返回 (原始消息, correlation ID, 是否是Future请求)

func GenerateID

func GenerateID() string

GenerateID 生成唯一ID(便捷函数)

func GenerateUniqueName

func GenerateUniqueName() string

GenerateUniqueName 生成唯一actor名称

func GetGoroutineCount

func GetGoroutineCount() int

GetGoroutineCount returns the current goroutine count

func GetMemStats

func GetMemStats() runtime.MemStats

GetMemStats returns memory statistics

func GetProfilingEndpoints

func GetProfilingEndpoints() []string

GetProfilingEndpoints returns available profiling endpoints

func GetProfilingURL

func GetProfilingURL(port string) string

GetProfilingURL returns the URL for accessing profiling data

func GetRuntimeStats

func GetRuntimeStats() map[string]interface{}

GetRuntimeStats returns current runtime statistics

func IsMetricsEnabled

func IsMetricsEnabled() bool

IsMetricsEnabled returns true if metrics collection is enabled

func IsProfilingEnabled

func IsProfilingEnabled() bool

IsProfilingEnabled returns true if profiling is enabled

func IsValidPath

func IsValidPath(path string) bool

IsValidPath 验证路径格式

func PrintMetrics

func PrintMetrics()

PrintMetrics prints the current metrics to stdout

func PrintProfilingInstructions

func PrintProfilingInstructions(port string)

PrintProfilingInstructions prints instructions for using profiling

func ProfilingInfo

func ProfilingInfo() (enabled bool, port string, started bool)

ProfilingInfo returns information about the profiling configuration

func RecordActorCreation

func RecordActorCreation()

RecordActorCreation records an actor creation

func RecordActorTermination

func RecordActorTermination()

RecordActorTermination records an actor termination

func RecordLatency

func RecordLatency(latency int64)

RecordLatency records a latency value in nanoseconds

func RecordMessageReceived

func RecordMessageReceived()

RecordMessageReceived records a received message

func RecordMessageSent

func RecordMessageSent()

RecordMessageSent records a sent message

func RecordRegistryLookup

func RecordRegistryLookup()

RecordRegistryLookup records a registry lookup

func ResetMetrics

func ResetMetrics()

ResetMetrics resets all metrics to zero

func SendResponse

func SendResponse(context ActorContext, msg interface{}, result interface{}, err error) bool

SendResponse 发送Future响应给sender 供接收Future请求的Actor使用 context是调用者的ActorContext msg是接收到的原始消息(包含correlation ID) result是响应结果 err是响应错误(可选)

func SetFutureLogLevel

func SetFutureLogLevel(level LogLevel)

SetFutureLogLevel 设置全局Future日志级别

func SetFutureLogger

func SetFutureLogger(logger FutureLogger)

SetFutureLogger 设置全局Future日志器

func ShutdownDefaultWorkerPool

func ShutdownDefaultWorkerPool()

ShutdownDefaultWorkerPool 关闭默认worker池

func StartCPUProfiling

func StartCPUProfiling(filename string) func() error

StartCPUProfiling starts CPU profiling (manual trigger) Returns a stop function that should be called to stop profiling and write the profile

func StartCPUProfilingWithDuration

func StartCPUProfilingWithDuration(filename string, duration time.Duration) error

StartCPUProfilingWithDuration starts CPU profiling for a specified duration Automatically stops profiling after the duration and writes to the specified file

func StopProfiling

func StopProfiling()

StopProfiling stops the pprof server

func ValidatePath

func ValidatePath(path string) error

ValidatePath 验证路径并返回错误

Types

type Actor

type Actor interface {
	// Receive 处理接收到的消息,通过ctx获取上下文信息
	Receive(ctx ActorContext)
}

Actor 定义Actor接口,处理接收到的消息

type ActorCell

type ActorCell struct {
	Props         *Props
	Mailbox       Mailbox
	Dispatcher    Dispatcher
	Ref           ActorRef
	Parent        ActorRef
	Children      map[string]ActorRef // 使用map实现O(1)查找和删除
	ChildrenMutex sync.RWMutex        // 保护Children的读写锁
	State         atomic.Int32        // LifecycleState
	// contains filtered or unexported fields
}

ActorCell 封装actor的元数据和状态

func NewActorCell

func NewActorCell(props *Props, ref ActorRef, parent ActorRef) *ActorCell

NewActorCell 创建新的actor cell

func (*ActorCell) AddChild

func (c *ActorCell) AddChild(ref ActorRef)

AddChild 添加子actor

func (*ActorCell) CompareAndSwapState

func (c *ActorCell) CompareAndSwapState(oldState, newState LifecycleState) bool

CompareAndSwapState 比较并交换状态

func (*ActorCell) FutureManager

func (c *ActorCell) FutureManager() FutureManager

FutureManager 获取FutureManager

func (*ActorCell) GetChildren

func (c *ActorCell) GetChildren() []ActorRef

GetChildren 获取所有子actor

func (*ActorCell) GetState

func (c *ActorCell) GetState() LifecycleState

GetState 获取当前状态

func (*ActorCell) IsAlive

func (c *ActorCell) IsAlive() bool

IsAlive 检查actor是否存活

func (*ActorCell) Path

func (c *ActorCell) Path() string

Path 获取actor的路径

func (*ActorCell) RemoveChild

func (c *ActorCell) RemoveChild(ref ActorRef)

RemoveChild 移除子actor

func (*ActorCell) SetFutureManager

func (c *ActorCell) SetFutureManager(manager FutureManager)

SetFutureManager 设置FutureManager

func (*ActorCell) SetState

func (c *ActorCell) SetState(state LifecycleState)

SetState 设置状态(原子操作)

type ActorContext

type ActorContext interface {
	// Self 获取actor自己的引用
	Self() ActorRef

	// Sender 获取消息发送者引用
	Sender() ActorRef

	// Message 获取当前正在处理的消息
	Message() interface{}

	// Tell 发送消息给其他actor
	Tell(ref ActorRef, msg interface{}) error

	// Ask 发送消息并等待响应(Future模式)
	// 返回一个Future,可以等待结果或注册回调
	Ask(target ActorRef, msg interface{}, timeout time.Duration) (Future, error)

	// Stop 停止当前actor
	Stop() error

	// Spawn 创建子actor
	Spawn(props *Props) (ActorRef, error)

	// Path 获取actor路径
	Path() string

	// Parent 获取父actor引用
	Parent() ActorRef
}

ActorContext 提供actor的元信息和操作接口

type ActorRef

type ActorRef interface {
	// Tell 异步发送消息给actor
	Tell(msg interface{}) error

	// Path 返回actor的路径
	Path() string

	// IsAlive 检查actor是否存活
	IsAlive() bool

	// Equals 比较两个引用是否指向同一个actor
	Equals(other ActorRef) bool
}

ActorRef 代表对actor的引用

func NewActorRef

func NewActorRef(path string, system ActorSystem, registry Registry) ActorRef

NewActorRef 创建actor引用

type ActorSystem

type ActorSystem interface {
	// Name 返回系统名称
	Name() string

	// Spawn 创建顶级actor
	Spawn(props *Props) (ActorRef, error)

	// SpawnWithName 创建指定名称的顶级actor
	SpawnWithName(props *Props, name string) (ActorRef, error)

	// Stop 停止指定actor
	Stop(ref ActorRef) error

	// Suspend 暂停actor
	Suspend(ref ActorRef) error

	// Resume 恢复actor
	Resume(ref ActorRef) error

	// Restart 重启actor
	Restart(ref ActorRef) error

	// Lookup 按路径查找actor
	Lookup(path string) (ActorRef, bool)

	// Shutdown 优雅关闭整个系统
	Shutdown(timeout *int) error

	// ShutdownNow 立即关闭系统
	ShutdownNow()

	// DeadLetters 返回死信actor引用
	DeadLetters() ActorRef

	// Root 获取根guardian引用
	Root() ActorRef

	// User 获取user guardian引用
	User() ActorRef

	// System 获取system guardian引用
	System() ActorRef

	// SystemActorOf 创建系统级actor
	SystemActorOf(props *Props, name string) (ActorRef, error)

	// LookupDispatcher 按名称查找dispatcher
	LookupDispatcher(name string) (Dispatcher, bool)

	// RegisterDispatcher 注册dispatcher
	RegisterDispatcher(name string, dispatcher Dispatcher)
}

ActorSystem actor系统接口

func GetSystem

func GetSystem(name string) (ActorSystem, error)

GetSystem 获取系统实例

func NewActorSystem

func NewActorSystem(name string, opts ...ConfigOption) (ActorSystem, error)

NewActorSystem 创建新的actor系统

func NewActorSystemFromConfig

func NewActorSystemFromConfig(config *Config) (ActorSystem, error)

NewActorSystemFromConfig 从配置创建actor系统

type AllForOneStrategy

type AllForOneStrategy struct {
	// contains filtered or unexported fields
}

AllForOneStrategy AllForOne监督策略

func (*AllForOneStrategy) Decider

func (s *AllForOneStrategy) Decider(child ActorRef, reason interface{}) Directive

AllForOneStrategy 实现 SupervisorStrategy 接口

func (*AllForOneStrategy) HandleFailure

func (s *AllForOneStrategy) HandleFailure(child ActorRef, reason interface{})

func (*AllForOneStrategy) SetSystem

func (s *AllForOneStrategy) SetSystem(system ActorSystem)

SetSystem 设置ActorSystem引用

type Config

type Config struct {
	Name               string
	DefaultDispatcher  Dispatcher
	DefaultMailboxSize int
	DefaultSupervisor  SupervisorStrategy
	RootGuardian       string
	UserGuardian       string
	SystemGuardian     string
}

Config 系统配置

func NewConfig

func NewConfig(name string, opts ...ConfigOption) *Config

NewConfig 创建配置

type ConfigOption

type ConfigOption func(*Config)

ConfigOption 配置选项函数

func WithDefaultDispatcher

func WithDefaultDispatcher(d Dispatcher) ConfigOption

WithDefaultDispatcher 设置默认dispatcher

func WithDefaultMailboxSize

func WithDefaultMailboxSize(size int) ConfigOption

WithDefaultMailboxSize 设置默认mailbox大小

func WithDefaultPinnedDispatcher

func WithDefaultPinnedDispatcher() ConfigOption

WithDefaultPinnedDispatcher 设置默认的pinning dispatcher

func WithDefaultSupervisor

func WithDefaultSupervisor(s SupervisorStrategy) ConfigOption

WithDefaultSupervisor 设置默认监督策略

func WithWorkerPool

func WithWorkerPool(workerNum int) ConfigOption

WithWorkerPool 启用worker池优化

type CustomFutureLogger

type CustomFutureLogger struct {
	DebugFunc func(format string, args ...interface{})
	InfoFunc  func(format string, args ...interface{})
	WarnFunc  func(format string, args ...interface{})
	ErrorFunc func(format string, args ...interface{})
}

CustomFutureLogger 允许用户自定义日志实现

func (*CustomFutureLogger) Debug

func (l *CustomFutureLogger) Debug(format string, args ...interface{})

Debug 实现

func (*CustomFutureLogger) Error

func (l *CustomFutureLogger) Error(format string, args ...interface{})

Error 实现

func (*CustomFutureLogger) Info

func (l *CustomFutureLogger) Info(format string, args ...interface{})

Info 实现

func (*CustomFutureLogger) Warn

func (l *CustomFutureLogger) Warn(format string, args ...interface{})

Warn 实现

type DeadLetter

type DeadLetter struct {
	Message   interface{}
	Sender    ActorRef
	Recipient ActorRef
	Timestamp string // RFC3339格式时间戳
}

DeadLetter 死信结构

type DeadLettersActor

type DeadLettersActor struct {
	*DefaultActor
	// contains filtered or unexported fields
}

DeadLettersActor 死信actor实现

func (*DeadLettersActor) GetDeadLetters

func (a *DeadLettersActor) GetDeadLetters() []DeadLetter

GetDeadLetters 返回保存的死信列表

func (*DeadLettersActor) GetDropCount

func (a *DeadLettersActor) GetDropCount() int64

GetDropCount 返回丢弃的消息总数

func (*DeadLettersActor) Receive

func (a *DeadLettersActor) Receive(ctx ActorContext)

type Decider

type Decider func(reason interface{}) Directive

Decider 决策函数类型

type DefaultActor

type DefaultActor struct{}

DefaultActor 默认actor实现,不处理任何消息

func (*DefaultActor) Receive

func (a *DefaultActor) Receive(ctx ActorContext)

Receive 默认实现:空实现

type DefaultFutureManager

type DefaultFutureManager struct {
	// contains filtered or unexported fields
}

DefaultFutureManager 是FutureManager的默认实现

func (*DefaultFutureManager) Cancel

func (m *DefaultFutureManager) Cancel(correlationID string) bool

Cancel 取消一个Future

func (*DefaultFutureManager) Cleanup

func (m *DefaultFutureManager) Cleanup()

Cleanup 清理已完成或超时的Future

func (*DefaultFutureManager) Complete

func (m *DefaultFutureManager) Complete(responseMsg ResponseMessage) bool

Complete 完成一个Future,将响应传递给等待者

func (*DefaultFutureManager) Count

func (m *DefaultFutureManager) Count() int

Count 返回当前活跃的Future数量

func (*DefaultFutureManager) Create

func (m *DefaultFutureManager) Create(target ActorRef, msg interface{}, timeout time.Duration) (Future, error)

Create 创建一个新的Future并发送消息

type Directive

type Directive int

Directive 定义监督决策指令

const (
	Resume   Directive = iota // 恢复子actor
	Restart                   // 重启子actor
	Stop                      // 停止子actor
	Escalate                  // 向上传播故障
)

func DefaultDecider

func DefaultDecider(reason interface{}) Directive

DefaultDecider 默认决策函数

func EscalatingDecider

func EscalatingDecider(reason interface{}) Directive

EscalatingDecider 总是向上传播故障

func StoppingDecider

func StoppingDecider(reason interface{}) Directive

StoppingDecider 总是停止故障actor

func (Directive) String

func (d Directive) String() string

type Dispatcher

type Dispatcher interface {
	// Schedule 调度任务执行
	Schedule(fn func())

	// Throughput 返回每次处理的消息数量
	Throughput() int

	// Name 返回dispatcher名称
	Name() string
}

Dispatcher 调度器接口,用于调度actor的消息处理

func NewCallingThreadDispatcher

func NewCallingThreadDispatcher() Dispatcher

NewCallingThreadDispatcher 创建调用线程dispatcher

func NewDefaultDispatcher

func NewDefaultDispatcher() Dispatcher

NewDefaultDispatcher 创建默认dispatcher

func NewPinnedDispatcher

func NewPinnedDispatcher() Dispatcher

NewPinnedDispatcher 创建固定dispatcher

func NewSyncDispatcher

func NewSyncDispatcher() Dispatcher

NewSyncDispatcher 创建同步dispatcher

type Future

type Future interface {
	// CorrelationID 返回Future的关联ID
	CorrelationID() string

	// Result 等待并返回结果,支持超时控制
	// 如果超时则返回 ErrFutureTimeout
	// 如果被取消则返回 ErrFutureCancelled
	// 如果操作失败则返回相应的错误
	Result(timeout time.Duration) (interface{}, error)

	// Await 是Result的别名,提供更简洁的API
	Await(timeout time.Duration) (interface{}, error)

	// OnComplete 注册一个回调函数,当Future完成时被调用
	// 如果Future已经完成,回调将立即被调用
	OnComplete(callback func(result interface{}, err error))

	// Cancel 取消Future操作
	Cancel()

	// IsReady 检查Future是否已完成
	IsReady() bool

	// State 返回Future当前的状态
	State() FutureState
}

Future 表示一个异步操作的结果

type FutureCallback

type FutureCallback func(result interface{}, err error)

FutureCallback 表示Future完成时的回调函数

type FutureImpl

type FutureImpl struct {
	// contains filtered or unexported fields
}

FutureImpl 是Future接口的实现

func NewFuture

func NewFuture(correlationID string) *FutureImpl

NewFuture 创建一个新的Future实例

func (*FutureImpl) Await

func (f *FutureImpl) Await(timeout time.Duration) (interface{}, error)

Await 是Result的别名,提供更简洁的API

func (*FutureImpl) Cancel

func (f *FutureImpl) Cancel()

Cancel 取消Future操作

func (*FutureImpl) CorrelationID

func (f *FutureImpl) CorrelationID() string

CorrelationID 返回Future的关联ID

func (*FutureImpl) IsReady

func (f *FutureImpl) IsReady() bool

IsReady 检查Future是否已完成

func (*FutureImpl) OnComplete

func (f *FutureImpl) OnComplete(callback func(result interface{}, err error))

OnComplete 注册一个回调函数,当Future完成时被调用

func (*FutureImpl) Result

func (f *FutureImpl) Result(timeout time.Duration) (interface{}, error)

Result 等待并返回结果,支持超时控制

func (*FutureImpl) State

func (f *FutureImpl) State() FutureState

State 返回Future当前的状态

type FutureLogger

type FutureLogger interface {
	// Debug 记录调试日志
	Debug(format string, args ...interface{})

	// Info 记录信息日志
	Info(format string, args ...interface{})

	// Warn 记录警告日志
	Warn(format string, args ...interface{})

	// Error 记录错误日志
	Error(format string, args ...interface{})
}

FutureLogger 定义Future的日志接口

func GetFutureLogger

func GetFutureLogger() FutureLogger

GetFutureLogger 获取当前的Future日志器

func NewCustomLogger

func NewCustomLogger(
	debugFunc func(format string, args ...interface{}),
	infoFunc func(format string, args ...interface{}),
	warnFunc func(format string, args ...interface{}),
	errorFunc func(format string, args ...interface{}),
) FutureLogger

NewCustomLogger 创建自定义日志器 参数都是可选的,nil表示该级别的日志被禁用

func NewJSONFutureLogger

func NewJSONFutureLogger(output *os.File, level LogLevel) FutureLogger

NewJSONFutureLogger 创建JSON日志器

type FutureManager

type FutureManager interface {
	// Create 创建一个新的Future并发送消息
	// 为消息包装correlation ID并返回Future实例
	Create(target ActorRef, msg interface{}, timeout time.Duration) (Future, error)

	// Complete 完成一个Future,将响应传递给等待者
	Complete(responseMsg ResponseMessage) bool

	// Cancel 取消一个Future
	Cancel(correlationID string) bool

	// Cleanup 清理已完成或超时的Future
	Cleanup()

	// Count 返回当前活跃的Future数量
	Count() int
}

FutureManager 管理Actor的所有Future请求

func NewFutureManager

func NewFutureManager() FutureManager

NewFutureManager 创建一个新的FutureManager

type FutureState

type FutureState int32

FutureState 表示Future的状态

const (
	// FutureStatePending 表示Future等待中
	FutureStatePending FutureState = iota
	// FutureStateCompleted 表示Future已完成(成功或失败)
	FutureStateCompleted
	// FutureStateTimeout表示Future超时
	FutureStateTimeout
	// FutureStateCancelled 表示Future已取消
	FutureStateCancelled
)

func (FutureState) String

func (s FutureState) String() string

String 返回状态的字符串表示

type GuardianActor

type GuardianActor struct {
	*DefaultActor
	// contains filtered or unexported fields
}

GuardianActor guardian actor实现

func (*GuardianActor) Receive

func (g *GuardianActor) Receive(ctx ActorContext)

type IDGenerator

type IDGenerator struct {
	// contains filtered or unexported fields
}

IDGenerator 轻量级ID生成器,用于替代uuid.New()

func GetGlobalIDGenerator

func GetGlobalIDGenerator() *IDGenerator

GetGlobalIDGenerator 获取全局ID生成器(单例)

func NewIDGenerator

func NewIDGenerator() *IDGenerator

NewIDGenerator 创建新的ID生成器

func (*IDGenerator) Generate

func (g *IDGenerator) Generate() string

Generate 生成唯一的ID字符串 格式: {nodeID}-{counter}

type JSONFutureLogger

type JSONFutureLogger struct {
	// contains filtered or unexported fields
}

JSONFutureLogger 输出JSON格式的日志,适合日志聚合系统

func (*JSONFutureLogger) Debug

func (l *JSONFutureLogger) Debug(format string, args ...interface{})

Debug 实现

func (*JSONFutureLogger) Error

func (l *JSONFutureLogger) Error(format string, args ...interface{})

Error 实现

func (*JSONFutureLogger) Info

func (l *JSONFutureLogger) Info(format string, args ...interface{})

Info 实现

func (*JSONFutureLogger) Warn

func (l *JSONFutureLogger) Warn(format string, args ...interface{})

Warn 实现

type LifecycleActor

type LifecycleActor interface {
	Actor

	// PreStart 在actor启动前调用
	PreStart(ctx ActorContext) error

	// PostStop 在actor停止后调用
	PostStop(ctx ActorContext) error

	// PreRestart 在actor重启前调用(子actor停止后)
	PreRestart(ctx ActorContext, reason interface{})

	// PostRestart 在actor重启后调用(PreStart之前)
	PostRestart(ctx ActorContext, reason interface{})
}

LifecycleActor 定义生命周期挂钩接口

type LifecycleManager

type LifecycleManager struct {
	// contains filtered or unexported fields
}

LifecycleManager 管理actor生命周期

func NewLifecycleManager

func NewLifecycleManager(cell *ActorCell) *LifecycleManager

NewLifecycleManager 创建生命周期管理器

func (*LifecycleManager) CanTransition

func (lm *LifecycleManager) CanTransition(from, to LifecycleState) bool

CanTransition 检查是否可以转换状态

type LifecycleState

type LifecycleState int32

LifecycleState 定义actor生命周期状态

const (
	Created        LifecycleState = iota // 已创建但未启动
	StartInitiated                       // 启动中
	Started                              // 已启动,正常运行
	Suspended                            // 已暂停,不处理消息
	Stopping                             // 停止中
	Stopped                              // 已停止
	Restarter                            // 重启中
)

func (LifecycleState) String

func (s LifecycleState) String() string

type LogLevel

type LogLevel int

LogLevel 定义日志级别

const (
	// LogLevelNone 禁用所有日志
	LogLevelNone LogLevel = iota
	// LogLevelError 只记录错误
	LogLevelError
	// LogLevelWarn 记录错误和警告
	LogLevelWarn
	// LogLevelInfo 记录错误、警告和信息
	LogLevelInfo
	// LogLevelDebug 记录所有日志(包括调试信息)
	LogLevelDebug
)

type Mailbox

type Mailbox interface {
	// PostUserMessage 投递用户消息
	PostUserMessage(msg interface{})

	// PostSystemMessage 投递系统消息
	PostSystemMessage(msg interface{})

	// UserMessageCount 获取用户消息数量
	UserMessageCount() int
}

Mailbox 邮箱接口,用于消息处理和故障上报

func NewMailbox

func NewMailbox(cell *ActorCell, dispatcher Dispatcher) Mailbox

NewMailbox 创建新的mailbox

type MessageInvoker

type MessageInvoker interface {
	// Invoke 调用actor处理消息
	Invoke(receiver Actor, msg interface{})

	// EscalateFailure 将故障上报给监督者
	EscalateFailure(reason string, msg interface{})
}

MessageInvoker 邮箱接口,用于消息处理和故障上报

type Metrics

type Metrics struct {
	MessagesSent      atomic.Uint64
	MessagesReceived  atomic.Uint64
	ActorCreations    atomic.Uint64
	ActorTerminations atomic.Uint64
	RegistryLookups   atomic.Uint64
	LockAcquisitions  atomic.Uint64

	// Latency measurements (in nanoseconds)
	TotalLatency atomic.Uint64
	LatencyCount atomic.Uint64

	// Start time for rate calculations
	StartTime atomic.Int64
}

Metrics represents performance metric collection

func GetMetrics

func GetMetrics() *Metrics

GetMetrics returns the current metrics

type MetricsSnapshot

type MetricsSnapshot struct {
	MessagesPerSec     float64
	MessagesReceived   uint64
	MessagesSent       uint64
	ActiveActors       uint64
	ActorCreations     uint64
	RegistryLookupsSec float64
	AvgLatency         float64
	Uptime             time.Duration
}

MetricsSnapshot represents a snapshot of current metrics

func GetMetricsSnapshot

func GetMetricsSnapshot() MetricsSnapshot

GetMetricsSnapshot returns a snapshot of current metrics with calculated rates

type OneForOneStrategy

type OneForOneStrategy struct {
	// contains filtered or unexported fields
}

OneForOneStrategy OneForOne监督策略

func (*OneForOneStrategy) Decider

func (s *OneForOneStrategy) Decider(child ActorRef, reason interface{}) Directive

OneForOneStrategy 实现 SupervisorStrategy 接口

func (*OneForOneStrategy) HandleFailure

func (s *OneForOneStrategy) HandleFailure(child ActorRef, reason interface{})

func (*OneForOneStrategy) SetSystem

func (s *OneForOneStrategy) SetSystem(system ActorSystem)

SetSystem 设置ActorSystem引用

type ProfilingConfig

type ProfilingConfig struct {
	Enabled bool
	Port    string
}

ProfilingConfig holds profiling configuration

func DefaultProfilingConfig

func DefaultProfilingConfig() ProfilingConfig

DefaultProfilingConfig returns default profiling configuration

type Props

type Props struct {
	Actor      func() Actor     // actor工厂函数
	Name       string           // actor名称
	Mailbox    Mailbox          // 自定义mailbox(可选)
	Dispatcher Dispatcher       // 自定义dispatcher(可选)
	Supervisor SupervisorConfig // 监督配置
}

Props 用于配置actor的创建参数

func NewProps

func NewProps(actorFunc func() Actor) *Props

NewProps 创建默认Props

func (*Props) WithActor

func (p *Props) WithActor(actorFunc func() Actor) *Props

WithActor 设置actor工厂函数

func (*Props) WithDispatcher

func (p *Props) WithDispatcher(dispatcher Dispatcher) *Props

WithDispatcher 设置dispatcher

func (*Props) WithMailbox

func (p *Props) WithMailbox(mailbox Mailbox) *Props

WithMailbox 设置mailbox

func (*Props) WithName

func (p *Props) WithName(name string) *Props

WithName 设置actor名称,返回指针以支持链式调用

func (*Props) WithSupervisor

func (p *Props) WithSupervisor(strategy SupervisorStrategy) *Props

WithSupervisor 设置监督策略

type PropsFunc

type PropsFunc func(*Props)

PropsFunc 用于配置Props的函数类型

type Registry

type Registry interface {
	// Register 注册actor到指定路径
	Register(path string, cell *ActorCell) error

	// Unregister 注销actor
	Unregister(path string)

	// Lookup 查找路径对应的actor
	Lookup(path string) (*ActorCell, bool)

	// Resolve 解析路径字符串为actor引用
	Resolve(path string) (ActorRef, bool)

	// All 返回所有注册的actor路径
	All() []string

	// Children 返回父路径下所有子actor路径
	Children(parent string) []string

	// Count 返回注册的actor总数
	Count() int
}

Registry 接口用于管理actor注册

func NewRegistry

func NewRegistry() Registry

NewRegistry 创建新的注册表

func NewShardedRegistry

func NewShardedRegistry() Registry

NewShardedRegistry 创建新的分片注册表

type ResponseMessage

type ResponseMessage struct {
	CorrelationID string      // 关联ID,用于匹配Future请求
	Result        interface{} // 响应结果
	Error         error       // 响应错误
}

ResponseMessage 表示Future的响应消息

func CreateResponse

func CreateResponse(correlationID string, result interface{}, err error) ResponseMessage

CreateResponse 创建Future响应消息

type Supervisor

type Supervisor interface {
	// HandleChildFailure 处理子actor故障
	HandleChildFailure(child *ActorCell, reason interface{})
}

Supervisor 监督者接口

type SupervisorConfig

type SupervisorConfig struct {
	Strategy     SupervisorStrategy
	StopChildren bool
}

SupervisorConfig 监督配置

type SupervisorStrategy

type SupervisorStrategy interface {
	// Decider 决定如何处理子actor故障
	Decider(child ActorRef, reason interface{}) Directive

	// HandleFailure 处理子actor失败
	HandleFailure(child ActorRef, reason interface{})

	// SetSystem 设置ActorSystem引用
	SetSystem(system ActorSystem)
}

SupervisorStrategy 监督策略接口

func NewAllForOneStrategy

func NewAllForOneStrategy(maxNrOfRetries int, withinTimeRange time.Duration, decider Decider) SupervisorStrategy

NewAllForOneStrategy 创建AllForOne策略

func NewOneForOneStrategy

func NewOneForOneStrategy(maxNrOfRetries int, withinTimeRange time.Duration, decider Decider) SupervisorStrategy

NewOneForOneStrategy 创建OneForOne策略

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool goroutine池,用于复用goroutine减少调度开销

func GetDefaultWorkerPool

func GetDefaultWorkerPool() *WorkerPool

GetDefaultWorkerPool 获取默认的worker池(单例)

func NewWorkerPool

func NewWorkerPool(workerNum int) *WorkerPool

NewWorkerPool 创建新的goroutine池

func (*WorkerPool) Schedule

func (p *WorkerPool) Schedule(fn func())

Schedule 调度任务执行

func (*WorkerPool) Shutdown

func (p *WorkerPool) Shutdown()

Shutdown 优雅关闭goroutine池

func (*WorkerPool) TaskQueueLen

func (p *WorkerPool) TaskQueueLen() int

TaskQueueLen 返回任务队列长度

func (*WorkerPool) WorkerCount

func (p *WorkerPool) WorkerCount() int

WorkerCount 返回worker数量

Directories

Path Synopsis
utils

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL