The sections below group the most commonly used public modules. Each entry is
generated with Sphinx autodoc so it stays in sync with the codebase. Docstrings
follow the NumPy style and the signatures below use a color legend:
Aggregators combine multiple gradient vectors from different nodes into a
single aggregated gradient. This is the core mechanism for Byzantine-robust
distributed learning, as aggregators must be resilient to malicious or
corrupted gradients.
Subclasses must implement aggregate() to define the aggregation
strategy. The base class exposes the Operator interface so aggregators can
be scheduled inside computation graphs just like any other operator.
Aggregators can optionally support parallel execution via subtasks by
setting supports_subtasks=True and implementing
create_subtasks() and reduce_subtasks().
Reduce a sequence of gradient tensors into a single aggregated tensor.
This is the core method that subclasses must implement. It receives a
sequence of gradient vectors (one per node) and returns a single
aggregated gradient vector.
Parameters:
gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects).
All gradients must have the same shape and be from the same
backend (NumPy, PyTorch, etc.). The sequence should be non-empty.
Returns:
A tensor with the same shape, dtype, and device/backend as the
input gradients representing the aggregated gradient.
Return type:
Any
Raises:
ValueError – If the gradients sequence is empty or invalid.
This aggregator computes the median independently for each coordinate
(dimension) across all input gradients. It is robust to up to 50% Byzantine
nodes when the honest gradients are well-separated.
The median is computed coordinate-wise, meaning for each dimension d, the
output is the median of all gradients’ d-th coordinate. This makes it
computationally efficient and suitable for high-dimensional gradients.
The aggregator supports parallel execution via subtasks, chunking the
computation across multiple workers for improved performance on large
gradients.
Parameters:
chunk_size (int, optional) – Size of chunks for parallel processing. Larger values reduce overhead
but may limit parallelism. Default is 8192.
gradients (Sequence[Any]) – Non-empty sequence of gradient tensors/arrays. All must have the
same shape and be from the same backend (NumPy, PyTorch, etc.).
Returns:
Aggregated gradient tensor with the same shape and backend as
inputs. Each coordinate is the median of that coordinate across
all input gradients.
This aggregator computes a trimmed mean independently for each coordinate.
For each dimension, it sorts the values, removes the f smallest and f
largest values, and averages the remaining (n - 2f) values.
This makes it robust to up to f Byzantine nodes on each coordinate when
honest gradients are well-separated from outliers.
Parameters:
f (int) – Number of extreme values to trim from each end. Must satisfy
0 <= 2f < n where n is the number of gradients.
chunk_size (int, optional) – Size of chunks for parallel processing. Default is 4096.
Compute coordinate-wise trimmed mean of gradients.
Parameters:
gradients (Sequence[Any]) – Non-empty sequence of gradient tensors. All must have the same
shape and backend. The sequence length n must satisfy 2f < n.
Returns:
Aggregated gradient tensor with the same shape and backend as
inputs. Each coordinate is the mean of the middle (n-2f) values
after sorting.
This is the original Krum algorithm, which selects the single gradient
with the smallest Krum score (sum of distances to nearest neighbors) and
returns it directly (without averaging).
Parameters:
f (int) – Maximum number of Byzantine nodes to tolerate. Must satisfy
0 <= f < n-1 where n is the number of gradients.
chunk_size (int, optional) – Size of chunks for parallel distance computation. Default is 32.
Multi-Krum aggregator for Byzantine-robust gradient aggregation.
Multi-Krum is a geometric aggregation method that selects the q gradients
with the smallest sum of squared distances to their nearest neighbors,
then returns their mean. This makes it robust to up to f Byzantine nodes.
Algorithm:
1. For each gradient i, compute the sum of squared Euclidean distances to
its (n - f - 1) nearest neighbors (excluding itself).
Select the q gradients with the smallest sums (Krum scores).
Return the mean of the selected gradients.
Parameters:
f (int) – Maximum number of Byzantine nodes to tolerate. Must satisfy
0 <= f < n-1 where n is the number of gradients.
q (int) – Number of gradients to select. Must satisfy 1 <= q <= n - f.
chunk_size (int, optional) – Size of chunks for parallel distance computation. Default is 32.
Select the q most consensus gradients using the Krum score.
Parameters:
gradients (Sequence[Any]) – Sequence of gradient tensors. All must have the same shape and
backend. The sequence length n must satisfy f < n-1 and q <= n-f.
Returns:
Mean of the q selected gradients. Same shape and backend as inputs.
Return type:
Any
Raises:
ValueError – If gradients is empty, or if f >= n-1, or if q > n-f.
Reduce a sequence of gradient tensors into a single aggregated tensor.
This is the core method that subclasses must implement. It receives a
sequence of gradient vectors (one per node) and returns a single
aggregated gradient vector.
Parameters:
gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects).
All gradients must have the same shape and be from the same
backend (NumPy, PyTorch, etc.). The sequence should be non-empty.
Returns:
A tensor with the same shape, dtype, and device/backend as the
input gradients representing the aggregated gradient.
Return type:
Any
Raises:
ValueError – If the gradients sequence is empty or invalid.
Enumerates all n-f sized subsets, selects the one whose empirical
covariance has the smallest maximum eigenvalue, and returns that subset’s
mean. Uses an m×m eigenproblem (Gram matrix) for efficiency and supports
chunked evaluation via the task scheduler.
Reduce a sequence of gradient tensors into a single aggregated tensor.
This is the core method that subclasses must implement. It receives a
sequence of gradient vectors (one per node) and returns a single
aggregated gradient vector.
Parameters:
gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects).
All gradients must have the same shape and be from the same
backend (NumPy, PyTorch, etc.). The sequence should be non-empty.
Returns:
A tensor with the same shape, dtype, and device/backend as the
input gradients representing the aggregated gradient.
Return type:
Any
Raises:
ValueError – If the gradients sequence is empty or invalid.
An iterative aggregation method that clips gradients based on their
distance from a running center estimate. The center is updated iteratively
by averaging clipped gradients.
Reduce a sequence of gradient tensors into a single aggregated tensor.
This is the core method that subclasses must implement. It receives a
sequence of gradient vectors (one per node) and returns a single
aggregated gradient vector.
Parameters:
gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects).
All gradients must have the same shape and be from the same
backend (NumPy, PyTorch, etc.). The sequence should be non-empty.
Returns:
A tensor with the same shape, dtype, and device/backend as the
input gradients representing the aggregated gradient.
Return type:
Any
Raises:
ValueError – If the gradients sequence is empty or invalid.
Attacks simulate malicious behavior in distributed learning by generating
adversarial gradient vectors. Subclasses implement different attack
strategies by setting input requirement flags and implementing apply().
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
This attack computes the mean of honest gradients and scales it by a
factor (typically negative). It requires access to honest gradients from
other nodes.
The attack strategy is: g_malicious = scale * mean(honest_grads)
Parameters:
scale (float, optional) – Scaling factor for the mean. Default is -1.0 (inverted mean).
Negative values create adversarial gradients that point in the
opposite direction of the honest consensus.
chunk_size (int, optional) – Size of chunks for parallel mean computation. Default is 8.
Examples
>>> attack=EmpireAttack(scale=-1.0)>>> honest_grads=[torch.randn(100)for_inrange(5)]>>> malicious=attack.apply(honest_grads=honest_grads)>>> assertmalicious.shape==(100,)>>> # malicious is approximately -mean(honest_grads)
Notes
Requires uses_honest_grads=True (needs access to honest gradients).
Supports parallel execution via subtasks for large gradients.
Time complexity: O(n * d) where n is number of honest gradients and
d is dimension. With subtasks: O(n * d / workers).
Memory complexity: O(n * d) for stacking gradients.
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
This method implements the attack strategy. It receives the requested
inputs (based on the class’s uses_* flags) and returns a single
malicious gradient vector.
Parameters:
model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).
x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).
y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).
honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if
uses_honest_grads=True).
base_grad (Optional[Any]) – The node’s own honest gradient vector (required if
uses_base_grad=True).
Returns:
A single malicious gradient vector with the same shape, dtype, and
device/backend as the input gradients.
Return type:
Any
Raises:
ValueError – If required inputs are missing or invalid.
Pre-aggregators transform a sequence of vectors before aggregation. They
can reshape, filter, or combine vectors in various ways. Unlike aggregators
which reduce to a single vector, pre-aggregators return a list of vectors
(possibly of different length than the input).
Common use cases include:
- Bucketing: Group vectors into buckets and average within each bucket.
- Clipping: Clip vectors to a maximum norm.
- Nearest-neighbor mixing: Combine vectors with their nearest neighbors.
Subclasses must implement pre_aggregate() to define the
transformation strategy.
Bucketing pre-aggregator: group vectors into buckets and average.
This pre-aggregator randomly permutes input vectors, splits them into
consecutive buckets of a specified size, and returns the mean of each
bucket. This reduces the number of vectors while preserving some
statistical properties.
Algorithm:
1. Randomly permute the input vectors (or use provided permutation).
2. Split into consecutive buckets of size bucket_size.
3. Return the mean of each bucket.
Parameters:
bucket_size (int) – Number of vectors per bucket. Must be >= 1. The last bucket may be
smaller if the number of vectors is not divisible by bucket_size.
feature_chunk_size (int, optional) – Size of feature chunks for parallel processing. Default is 8192.
perm (Optional[Iterable[int]], optional) – Explicit permutation of indices. If None, a random permutation is
generated. Must be a permutation of range(n) where n is the number
of input vectors.
rng (Optional[random.Random], optional) – Random number generator for shuffling. If None, a new generator is
created.
Parameter server for synchronous distributed training.
This class orchestrates a parameter server training loop where:
1. All honest nodes compute gradients in parallel
2. Byzantine nodes generate malicious gradients (if present)
3. Gradients are optionally pre-aggregated
4. An aggregator combines all gradients into a single update
5. The aggregated gradient is sent back to all nodes
Parameters:
honest_nodes (List[HonestNodeActor]) – List of honest node actors that compute legitimate gradients.
byzantine_nodes (List[ByzantineNodeActor]) – List of Byzantine node actors that generate malicious gradients.
This method:
1. Collects gradients from all honest nodes
2. Generates malicious gradients from Byzantine nodes
3. Optionally applies pre-aggregation
4. Aggregates all gradients
5. Sends the aggregated gradient to all nodes
This class provides a high-level API for peer-to-peer distributed training
where nodes communicate directly with their neighbors according to a
topology (ring, complete graph, etc.). It wraps the DecentralizedPeerToPeer
runner for backwards compatibility.
Parameters:
honest_nodes (List[HonestNodeActor]) – List of honest node actors participating in P2P training.
byzantine_nodes (Optional[List[ByzantineNodeActor]]) – Optional list of Byzantine node actors.
topology (Topology) – Communication topology defining which nodes can communicate.
lr (float, optional) – Learning rate for gradient updates. Default is 0.05.
channel_name (str, optional) – Channel name for communication (kept for API compatibility).
Default is “p2p”.
context_factory (Optional[Callable[[str, int], NodeContext]], optional) – Factory function to create node contexts. If None, uses default
context creation.
This is a convenience wrapper for Byzantine nodes that primarily execute
attack pipelines. It reserves the “attack” pipeline name and provides
the run_attack() helper method.
This is a convenience wrapper that reserves special pipeline names for
honest node behaviors:
- “aggregate”: Aggregation pipeline for combining gradients
- “honest_gradient”: Pipeline for computing honest gradients
It provides helper methods like aggregate() and honest_gradient()
that automatically use the registered pipelines.
This class manages an actor pool, one or more computation graphs
(“pipelines”), and provides helpers to run them through a NodeScheduler.
It serves as the bridge between the application layer (aggregators,
attacks, etc.) and the scheduling layer (graphs, actors).
A node application can register multiple pipelines (computation graphs)
and execute them on demand. Each pipeline can use the shared actor pool
for parallel execution.
Parameters:
name (str) – Unique name for this node application.
actor_pool (ActorPool | Sequence[ActorPoolConfig]) – Either an existing ActorPool instance or a sequence of ActorPoolConfig
objects to create a new pool.
metadata (Optional[Mapping[str, Any]], optional) – Base metadata to include in all pipeline executions.
Examples
>>> frombyzpy.engine.graph.poolimportActorPoolConfig>>> app=NodeApplication(... name="node0",... actor_pool=[ActorPoolConfig(backend="thread",count=4)]... )>>> # Register and run pipelines...
Extended NodeScheduler with message-driven execution support.
This scheduler extends NodeScheduler to support message-driven computation
graphs where nodes can wait for messages from other nodes before proceeding.
This is essential for decentralized peer-to-peer training where nodes
communicate asynchronously.
The scheduler maintains a message cache and waiters, allowing operators
to use MessageTriggerOp to wait for specific message types.
Examples
>>> scheduler=MessageAwareNodeScheduler(graph,pool=pool)>>> # In another task, deliver a message>>> scheduler.deliver_message("gradient",payload=gradient_vector)>>> # In graph execution, wait for message>>> message=awaitscheduler.wait_for_message("gradient")
This method delivers a message of the specified type. If there are
any tasks waiting for this message type, they are immediately woken
up. Otherwise, the message is cached for future waiters.
Parameters:
message_type (str) – Type identifier for the message.
This class evaluates a computation graph by executing nodes in topological
order. It dispatches operators to an actor pool for parallel execution
when subtasks are supported.
Parameters:
graph (ComputationGraph) – The computation graph to execute.
pool (ActorPool | None, optional) – Optional actor pool for parallel operator execution. If None, operators
run synchronously on the main thread.
metadata (Optional[Mapping[str, Any]], optional) – Metadata to include in operator contexts (e.g., node name, pool size).
Pool of actor workers for parallel task execution.
This class manages a collection of actor workers from one or more backend
configurations. It schedules subtasks across workers based on task
affinity and worker capabilities, enabling efficient parallel execution
of computation graph operators.
The pool supports heterogeneous workers (e.g., mix of CPU and GPU workers)
and automatically routes tasks to appropriate workers based on their
declared capabilities.
Parameters:
configs (Sequence[ActorPoolConfig]) – Sequence of pool configurations. Each config creates a group of
workers with the same backend and capabilities.
Examples
>>> frombyzpy.engine.graph.poolimportActorPool,ActorPoolConfig>>> configs=[... ActorPoolConfig(backend="thread",count=4),... ActorPoolConfig(backend="gpu",count=2)... ]>>> pool=ActorPool(configs)>>> awaitpool.start()>>> # Use pool with NodeScheduler...>>> awaitpool.shutdown()
Notes
Workers are lazily started when start() is called.
Tasks are scheduled based on SubTask affinity hints.
The pool supports channel-based communication between workers.
Bind a channel with the given name on every worker in the pool and return
a helper that exposes per-worker send/receive helpers. Reuses existing bindings
when called multiple times for the same name.
Configuration for a pool of actors of the same type.
This dataclass specifies how to create a group of actor workers with
the same backend and capabilities.
Parameters:
backend (Union[str, ActorBackend]) – Backend specification. Can be a string (“thread”, “process”, “gpu”,
“tcp://host:port”, “ucx://host:port”) or an ActorBackend instance.
count (int, optional) – Number of workers to create. Default is 1.
capabilities (Sequence[str] | None, optional) – Explicit capability tags for these workers. If None, capabilities are
inferred from the backend. Default is None.
name (Optional[str], optional) – Optional name prefix for worker identification. Default is None.
Resolve an actor backend from a specification string or instance.
This function converts backend specification strings into ActorBackend
instances. It supports various backend types including threads, processes,
GPUs, and remote actors.
Parameters:
spec (Union[str, ActorBackend]) – Backend specification. Can be:
- “thread”: Thread-based actors in the current process
- “process”: Process-based actors in separate processes
- “gpu”: GPU-based actors for CUDA execution
- “tcp://host:port”: Remote actors via TCP
- “ucx://host:port”: Remote actors via UCX (for GPU clusters)
- An existing ActorBackend instance (returned as-is)
Returns:
The resolved actor backend instance.
Return type:
ActorBackend
Raises:
ValueError – If the specification string is not recognized.