API Reference

The sections below group the most commonly used public modules. Each entry is generated with Sphinx autodoc so it stays in sync with the codebase. Docstrings follow the NumPy style and the signatures below use a color legend:

Aggregators

Shared abstractions for gradient aggregators.

class byzpy.aggregators.base.Aggregator[source]

Bases: Operator, ABC

Base class for every gradient aggregator.

Aggregators combine multiple gradient vectors from different nodes into a single aggregated gradient. This is the core mechanism for Byzantine-robust distributed learning, as aggregators must be resilient to malicious or corrupted gradients.

Subclasses must implement aggregate() to define the aggregation strategy. The base class exposes the Operator interface so aggregators can be scheduled inside computation graphs just like any other operator.

Aggregators can optionally support parallel execution via subtasks by setting supports_subtasks = True and implementing create_subtasks() and reduce_subtasks().

Examples

>>> from byzpy.aggregators.coordinate_wise.median import CoordinateWiseMedian
>>> aggregator = CoordinateWiseMedian()
>>> gradients = [torch.randn(100) for _ in range(10)]
>>> result = aggregator.aggregate(gradients)
>>> result.shape
torch.Size([100])
abstractmethod aggregate(gradients: Sequence[Any]) Any[source]

Reduce a sequence of gradient tensors into a single aggregated tensor.

This is the core method that subclasses must implement. It receives a sequence of gradient vectors (one per node) and returns a single aggregated gradient vector.

Parameters:

gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects). All gradients must have the same shape and be from the same backend (NumPy, PyTorch, etc.). The sequence should be non-empty.

Returns:

A tensor with the same shape, dtype, and device/backend as the input gradients representing the aggregated gradient.

Return type:

Any

Raises:

ValueError – If the gradients sequence is empty or invalid.

compute(inputs: Mapping[str, Any], *, context: OpContext) Any[source]

Compute the aggregated gradient from input gradients.

This method is called by the computation graph scheduler. It extracts gradients from the inputs dictionary and delegates to aggregate().

Parameters:
  • inputs (Mapping[str, Any]) – Input dictionary containing gradients under the key specified by input_key (default: “gradients”).

  • context (OpContext) – Runtime context with node metadata and pool information.

Returns:

Aggregated gradient tensor with the same shape and backend as the input gradients.

Return type:

Any

Raises:
  • KeyError – If the expected input key is missing.

  • TypeError – If the input is not a sequence.

input_key = 'gradients'
name: str = 'aggregator'
class byzpy.aggregators.coordinate_wise.median.CoordinateWiseMedian(*, chunk_size: int = 8192)[source]

Bases: Aggregator

Coordinate-wise median aggregator.

This aggregator computes the median independently for each coordinate (dimension) across all input gradients. It is robust to up to 50% Byzantine nodes when the honest gradients are well-separated.

The median is computed coordinate-wise, meaning for each dimension d, the output is the median of all gradients’ d-th coordinate. This makes it computationally efficient and suitable for high-dimensional gradients.

The aggregator supports parallel execution via subtasks, chunking the computation across multiple workers for improved performance on large gradients.

Parameters:

chunk_size (int, optional) – Size of chunks for parallel processing. Larger values reduce overhead but may limit parallelism. Default is 8192.

Examples

>>> aggregator = CoordinateWiseMedian(chunk_size=4096)
>>> gradients = [torch.randn(1000) for _ in range(10)]
>>> result = aggregator.aggregate(gradients)
>>> assert result.shape == (1000,)

Notes

  • Robust to up to floor((n-1)/2) Byzantine nodes where n is the total number of gradients.

  • Time complexity: O(n * d) where n is number of gradients and d is dimension. With subtasks: O(n * d / workers).

  • Memory complexity: O(n * d) for stacking gradients.

aggregate(gradients: Sequence[Any]) Any[source]

Compute coordinate-wise median of gradients.

Parameters:

gradients (Sequence[Any]) – Non-empty sequence of gradient tensors/arrays. All must have the same shape and be from the same backend (NumPy, PyTorch, etc.).

Returns:

Aggregated gradient tensor with the same shape and backend as inputs. Each coordinate is the median of that coordinate across all input gradients.

Return type:

Any

Raises:

ValueError – If gradients sequence is empty.

class byzpy.aggregators.coordinate_wise.trimmed_mean.CoordinateWiseTrimmedMean(f: int, *, chunk_size: int = 4096)[source]

Bases: Aggregator

Coordinate-wise Trimmed Mean (CwTM) aggregator.

This aggregator computes a trimmed mean independently for each coordinate. For each dimension, it sorts the values, removes the f smallest and f largest values, and averages the remaining (n - 2f) values.

This makes it robust to up to f Byzantine nodes on each coordinate when honest gradients are well-separated from outliers.

Parameters:
  • f (int) – Number of extreme values to trim from each end. Must satisfy 0 <= 2f < n where n is the number of gradients.

  • chunk_size (int, optional) – Size of chunks for parallel processing. Default is 4096.

Examples

>>> aggregator = CoordinateWiseTrimmedMean(f=2, chunk_size=2048)
>>> gradients = [torch.randn(1000) for _ in range(10)]
>>> result = aggregator.aggregate(gradients)
>>> assert result.shape == (1000,)

Notes

  • Robust to up to f Byzantine nodes per coordinate.

  • Time complexity: O(n * d * log(n)) for sorting, O(n * d) for averaging. With subtasks: O(n * d * log(n) / workers).

  • Memory complexity: O(n * d) for stacking and sorting gradients.

References

aggregate(gradients: Sequence[Any]) Any[source]

Compute coordinate-wise trimmed mean of gradients.

Parameters:

gradients (Sequence[Any]) – Non-empty sequence of gradient tensors. All must have the same shape and backend. The sequence length n must satisfy 2f < n.

Returns:

Aggregated gradient tensor with the same shape and backend as inputs. Each coordinate is the mean of the middle (n-2f) values after sorting.

Return type:

Any

Raises:

ValueError – If gradients is empty or if 2f >= n.

class byzpy.aggregators.coordinate_wise.mean_of_medians.MeanOfMedians(f: int, *, chunk_size: int = 8192)[source]

Bases: Aggregator

Coordinate-wise Mean of Medians (MeaMed).

For each coordinate k this aggregator:

  • Computes the median m_k of {x_{1k}, ..., x_{nk}}.

  • Keeps the (n - f) values whose |x_{ik} - m_k| are smallest.

  • Returns the mean of the retained values.

aggregate(gradients: Sequence[Any]) Any[source]

Drop the f farthest values per coordinate and average the rest.

Args:

gradients: Sequence of gradient tensors of identical shape/dtype.

Returns:

Tensor whose entries are the mean of the n - f closest values to the coordinate-wise medians.

class byzpy.aggregators.geometric_wise.geometric_median.GeometricMedian(*, tol: float = 1e-06, max_iter: int = 256, eps: float = 1e-12, init: str = 'median', chunk_size: int = 32)[source]

Bases: Aggregator

Geometric Median via Weiszfeld’s algorithm.

Minimizes sum_i ||x - g_i||_2. Uses backend ops via get_backend().

aggregate(gradients: Sequence[Any]) Any[source]

Run Weiszfeld iterations until convergence or max_iter steps.

Args:

gradients: Sequence of gradient tensors to summarize.

class byzpy.aggregators.geometric_wise.krum.Krum(f: int, *, chunk_size: int = 32)[source]

Bases: Aggregator

Krum aggregator (Multi-Krum with q=1).

This is the original Krum algorithm, which selects the single gradient with the smallest Krum score (sum of distances to nearest neighbors) and returns it directly (without averaging).

Parameters:
  • f (int) – Maximum number of Byzantine nodes to tolerate. Must satisfy 0 <= f < n-1 where n is the number of gradients.

  • chunk_size (int, optional) – Size of chunks for parallel distance computation. Default is 32.

Examples

>>> aggregator = Krum(f=2, chunk_size=16)
>>> gradients = [torch.randn(100) for _ in range(10)]
>>> result = aggregator.aggregate(gradients)
>>> assert result.shape == (100,)

Notes

  • Equivalent to MultiKrum(f=f, q=1).

  • Returns a single selected gradient rather than the mean of q gradients.

  • Robust to up to f Byzantine nodes.

See also

MultiKrum

The general version that selects q gradients.

aggregate(gradients: Sequence[Any]) Any[source]

Return the single gradient picked by the underlying Multi-Krum.

Parameters:

gradients (Sequence[Any]) – Sequence of gradient tensors. All must have the same shape and backend.

Returns:

The single selected gradient (not averaged). Same shape and backend as inputs.

Return type:

Any

class byzpy.aggregators.geometric_wise.krum.MultiKrum(f: int, q: int, *, chunk_size: int = 32)[source]

Bases: Aggregator

Multi-Krum aggregator for Byzantine-robust gradient aggregation.

Multi-Krum is a geometric aggregation method that selects the q gradients with the smallest sum of squared distances to their nearest neighbors, then returns their mean. This makes it robust to up to f Byzantine nodes.

Algorithm: 1. For each gradient i, compute the sum of squared Euclidean distances to

its (n - f - 1) nearest neighbors (excluding itself).

  1. Select the q gradients with the smallest sums (Krum scores).

  2. Return the mean of the selected gradients.

Parameters:
  • f (int) – Maximum number of Byzantine nodes to tolerate. Must satisfy 0 <= f < n-1 where n is the number of gradients.

  • q (int) – Number of gradients to select. Must satisfy 1 <= q <= n - f.

  • chunk_size (int, optional) – Size of chunks for parallel distance computation. Default is 32.

Examples

>>> aggregator = MultiKrum(f=2, q=5, chunk_size=16)
>>> gradients = [torch.randn(100) for _ in range(10)]
>>> result = aggregator.aggregate(gradients)
>>> assert result.shape == (100,)

Notes

  • Robust to up to f Byzantine nodes when honest gradients are well-separated.

  • Time complexity: O(n^2 * d) for distance computation, where n is number of gradients and d is dimension. With subtasks: O(n^2 * d / workers).

  • Memory complexity: O(n^2) for distance matrix, O(n * d) for gradients.

  • The original Krum algorithm corresponds to q=1.

References

aggregate(gradients: Sequence[Any]) Any[source]

Select the q most consensus gradients using the Krum score.

Parameters:

gradients (Sequence[Any]) – Sequence of gradient tensors. All must have the same shape and backend. The sequence length n must satisfy f < n-1 and q <= n-f.

Returns:

Mean of the q selected gradients. Same shape and backend as inputs.

Return type:

Any

Raises:

ValueError – If gradients is empty, or if f >= n-1, or if q > n-f.

class byzpy.aggregators.geometric_wise.minimum_diameter_average.MinimumDiameterAveraging(f: int, *, chunk_size: int = 256)[source]

Bases: Aggregator

Minimum Diameter Averaging (MDA), exact combinatorial search.

Chooses a subset S with |S| = n - f that minimizes the diameter max_{i,j in S} ||x_i - x_j|| and returns the average of vectors in that subset.

Constraint: 0 <= f < n.

aggregate(gradients: Sequence[Any]) Any[source]

Exhaustively search the (n - f)-subsets for the minimum-diameter set.

Args:

gradients: Sequence of tensors; length must exceed f.

class byzpy.aggregators.geometric_wise.monna.MoNNA(f: int, *, reference_index: int = 0, chunk_size: int = 32)[source]

Bases: Aggregator

MoNNA aggregator: average of the n-f nearest neighbors of a designated reference (trusted) vector.

aggregate(gradients: Sequence[Any]) Any[source]

Reduce a sequence of gradient tensors into a single aggregated tensor.

This is the core method that subclasses must implement. It receives a sequence of gradient vectors (one per node) and returns a single aggregated gradient vector.

Parameters:

gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects). All gradients must have the same shape and be from the same backend (NumPy, PyTorch, etc.). The sequence should be non-empty.

Returns:

A tensor with the same shape, dtype, and device/backend as the input gradients representing the aggregated gradient.

Return type:

Any

Raises:

ValueError – If the gradients sequence is empty or invalid.

class byzpy.aggregators.geometric_wise.smea.SMEA(f: int, *, chunk_size: int = 256)[source]

Bases: Aggregator

Smallest Maximum Eigenvalue Averaging (SMEA).

Enumerates all n - f sized subsets, selects the one whose empirical covariance has the smallest maximum eigenvalue, and returns that subset’s mean. Uses an m×m eigenproblem (Gram matrix) for efficiency and supports chunked evaluation via the task scheduler.

aggregate(gradients: Sequence[Any]) Any[source]

Reduce a sequence of gradient tensors into a single aggregated tensor.

This is the core method that subclasses must implement. It receives a sequence of gradient vectors (one per node) and returns a single aggregated gradient vector.

Parameters:

gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects). All gradients must have the same shape and be from the same backend (NumPy, PyTorch, etc.). The sequence should be non-empty.

Returns:

A tensor with the same shape, dtype, and device/backend as the input gradients representing the aggregated gradient.

Return type:

Any

Raises:

ValueError – If the gradients sequence is empty or invalid.

class byzpy.aggregators.norm_wise.center_clipping.CenteredClipping(*, c_tau: float, M: int = 10, eps: float = 1e-12, init: str = 'mean', chunk_size: int = 32)[source]

Bases: Aggregator

Centered Clipping (CC) aggregator.

An iterative aggregation method that clips gradients based on their distance from a running center estimate. The center is updated iteratively by averaging clipped gradients.

Algorithm:
For iteration m = 1 to M:

v_m = v_{m-1} + (1/n) * sum_i (x_i - v_{m-1}) * min(1, c_tau / ||x_i - v_{m-1}||)

Parameters:
  • c_tau (float) – Clipping radius (non-negative). Gradients beyond this distance from the center are clipped to this radius.

  • M (int, optional) – Number of iterations. Default is 10. More iterations improve convergence but increase computation time.

  • eps (float, optional) – Small constant to avoid division by zero. Default is 1e-12.

  • init (str, optional) – Initialization strategy for v0. Options: “mean”, “median”, or “zero”. Default is “mean”.

  • chunk_size (int, optional) – Size of chunks for parallel processing. Default is 32.

Examples

>>> aggregator = CenteredClipping(c_tau=1.0, M=10, init="mean")
>>> gradients = [torch.randn(100) for _ in range(10)]
>>> result = aggregator.aggregate(gradients)
>>> assert result.shape == (100,)

Notes

  • Robust to Byzantine nodes when honest gradients are concentrated.

  • Time complexity: O(M * n * d) where M is iterations, n is number of gradients, d is dimension. With subtasks: O(M * n * d / workers).

  • Memory complexity: O(n * d) for gradients, O(d) for center estimate.

  • Supports barriered subtasks for parallel iteration execution.

References

aggregate(gradients: Sequence[Any]) Any[source]

Run M iterations of the centered clipping update.

Parameters:

gradients (Sequence[Any]) – Non-empty sequence of gradient tensors. All must have the same shape and backend.

Returns:

Aggregated gradient tensor (the final center estimate v_M) with the same shape and backend as inputs.

Return type:

Any

Raises:

ValueError – If gradients sequence is empty.

class byzpy.aggregators.norm_wise.comparative_gradient_elimination.ComparativeGradientElimination(f: int, *, chunk_size: int = 8192)[source]

Bases: Aggregator

Comparative Gradient Elimination (CGE).

Sort inputs by their L2 norm and average the (n - f) vectors with the smallest norms.

Args (constructor):

f: number of vectors to drop by norm (must be >= 0)

aggregate(gradients: Sequence[Any]) Any[source]

Remove the f gradients with the largest L2 norms and average the rest.

Args:

gradients: Sequence of tensors to compare by L2 norm.

class byzpy.aggregators.norm_wise.caf.CAF(f: int, *, chunk_size: int = 256, power_iters: int = 3)[source]

Bases: Aggregator

Covariance-bound Agnostic Filter (CAF).

aggregate(gradients: Sequence[Any]) Any[source]

Reduce a sequence of gradient tensors into a single aggregated tensor.

This is the core method that subclasses must implement. It receives a sequence of gradient vectors (one per node) and returns a single aggregated gradient vector.

Parameters:

gradients (Sequence[Any]) – Ordered sequence of gradient tensors (or tensor-like objects). All gradients must have the same shape and be from the same backend (NumPy, PyTorch, etc.). The sequence should be non-empty.

Returns:

A tensor with the same shape, dtype, and device/backend as the input gradients representing the aggregated gradient.

Return type:

Any

Raises:

ValueError – If the gradients sequence is empty or invalid.

Attacks

class byzpy.attacks.base.Attack[source]

Bases: Operator, ABC

Base class for Byzantine attack implementations.

Attacks simulate malicious behavior in distributed learning by generating adversarial gradient vectors. Subclasses implement different attack strategies by setting input requirement flags and implementing apply().

Input Requirements

Subclasses declare what information they need by setting class attributes:

  • uses_base_grad: If True, the attack needs the node’s own honest gradient vector.

  • uses_model_batch: If True, the attack needs the model, input batch (x), and labels (y) for gradient computation.

  • uses_honest_grads: If True, the attack needs a list of other honest nodes’ gradient vectors.

Examples

>>> from byzpy.attacks.empire import EmpireAttack
>>> attack = EmpireAttack(scale=-1.0)
>>> malicious_grad = attack.apply(honest_grads=[grad1, grad2, grad3])
>>> assert malicious_grad.shape == grad1.shape

Notes

  • All attacks must return a single gradient vector with the same shape and backend as the inputs.

  • Attacks are used in simulations to test Byzantine-robust aggregators.

  • The attack’s apply method is called by the computation graph scheduler with the requested inputs.

abstractmethod apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Any] | None = None, base_grad: Any | None = None) Any[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

compute(inputs: Mapping[str, Any], *, context: OpContext) Any[source]
name: str = 'attack'
supports_subtasks: bool = False
uses_base_grad: bool = False
uses_honest_grads: bool = False
uses_model_batch: bool = False
class byzpy.attacks.empire.EmpireAttack(scale: float = -1.0, *, chunk_size: int = 8)[source]

Bases: Attack

Empire attack: scaled mean of honest gradients.

This attack computes the mean of honest gradients and scales it by a factor (typically negative). It requires access to honest gradients from other nodes.

The attack strategy is: g_malicious = scale * mean(honest_grads)

Parameters:
  • scale (float, optional) – Scaling factor for the mean. Default is -1.0 (inverted mean). Negative values create adversarial gradients that point in the opposite direction of the honest consensus.

  • chunk_size (int, optional) – Size of chunks for parallel mean computation. Default is 8.

Examples

>>> attack = EmpireAttack(scale=-1.0)
>>> honest_grads = [torch.randn(100) for _ in range(5)]
>>> malicious = attack.apply(honest_grads=honest_grads)
>>> assert malicious.shape == (100,)
>>> # malicious is approximately -mean(honest_grads)

Notes

  • Requires uses_honest_grads=True (needs access to honest gradients).

  • Supports parallel execution via subtasks for large gradients.

  • Time complexity: O(n * d) where n is number of honest gradients and d is dimension. With subtasks: O(n * d / workers).

  • Memory complexity: O(n * d) for stacking gradients.

References

apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Any] | None = None, base_grad: Any | None = None) Any[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

class byzpy.attacks.label_flip.LabelFlipAttack(*, num_classes: int | None = None, mapping: Dict[int, int] | None = None, loss_fn: Module | None = None, scale: float = 1.0)[source]

Bases: Attack

Compute ∇_θ L(model(x), flip(y)) and return the flattened vector.

apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Tensor] | None = None, base_grad: Tensor | None = None) Tensor[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

class byzpy.attacks.little.LittleAttack(f: int, N: int | None = None, *, chunk_size: int = 8192)[source]

Bases: Attack

‘Little is Enough’: g_mal = μ + z_max * σ with

s = floor(N/2)+1 - f z_max = Φ^{-1}((N - s)/N)

apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Any] | None = None, base_grad: Any | None = None) Any[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

class byzpy.attacks.sign_flip.SignFlipAttack(scale: float = -1.0, *, chunk_size: int = 8192)[source]

Bases: Attack

g_mal = scale * base_grad (default: sign flip with scale=-1).

apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Any] | None = None, base_grad: Any | None = None) Any[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

class byzpy.attacks.gaussian.GaussianAttack(mu: float = 0.0, sigma: float = 1.0, *, seed: int | None = None, chunk_size: int = 8192)[source]

Bases: Attack

Sample each coordinate independently from N(mu, sigma^2).

apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Any] | None = None, base_grad: Any | None = None) Any[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

class byzpy.attacks.inf.InfAttack(*, chunk_size: int = 8192)[source]

Bases: Attack

Return a vector filled with +inf matching the gradient shape.

apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Any] | None = None, base_grad: Any | None = None) Any[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

class byzpy.attacks.mimic.MimicAttack(epsilon: int = 0, *, chunk_size: int = 8192)[source]

Bases: Attack

Mimic an honest worker: return the vector of worker epsilon.

apply(*, model: Module | None = None, x: Tensor | None = None, y: Tensor | None = None, honest_grads: List[Any] | None = None, base_grad: Any | None = None) Any[source]

Generate and return a malicious gradient vector.

This method implements the attack strategy. It receives the requested inputs (based on the class’s uses_* flags) and returns a single malicious gradient vector.

Parameters:
  • model (Optional[nn.Module]) – PyTorch model (required if uses_model_batch=True).

  • x (Optional[torch.Tensor]) – Input batch tensor (required if uses_model_batch=True).

  • y (Optional[torch.Tensor]) – Label tensor (required if uses_model_batch=True).

  • honest_grads (Optional[List[Any]]) – List of honest nodes’ gradient vectors (required if uses_honest_grads=True).

  • base_grad (Optional[Any]) – The node’s own honest gradient vector (required if uses_base_grad=True).

Returns:

A single malicious gradient vector with the same shape, dtype, and device/backend as the input gradients.

Return type:

Any

Raises:

ValueError – If required inputs are missing or invalid.

Pre-Aggregators

class byzpy.pre_aggregators.base.PreAggregator[source]

Bases: Operator, ABC

Base class for pre-aggregation operations.

Pre-aggregators transform a sequence of vectors before aggregation. They can reshape, filter, or combine vectors in various ways. Unlike aggregators which reduce to a single vector, pre-aggregators return a list of vectors (possibly of different length than the input).

Common use cases include: - Bucketing: Group vectors into buckets and average within each bucket. - Clipping: Clip vectors to a maximum norm. - Nearest-neighbor mixing: Combine vectors with their nearest neighbors.

Subclasses must implement pre_aggregate() to define the transformation strategy.

Examples

>>> from byzpy.pre_aggregators.bucketing import Bucketing
>>> preagg = Bucketing(bucket_size=4)
>>> vectors = [torch.randn(100) for _ in range(10)]
>>> result = preagg.pre_aggregate(vectors)
>>> len(result)  # 10 vectors -> 3 buckets (ceil(10/4))
3
compute(inputs: Mapping[str, Any], *, context: OpContext) List[Any][source]

Compute the pre-aggregated vectors from input vectors.

This method is called by the computation graph scheduler. It extracts vectors from the inputs dictionary and delegates to pre_aggregate().

Parameters:
  • inputs (Mapping[str, Any]) – Input dictionary containing vectors under the key specified by input_key (default: “vectors”).

  • context (OpContext) – Runtime context with node metadata and pool information.

Returns:

List of transformed vectors. The length may differ from the input.

Return type:

List[Any]

Raises:
  • KeyError – If the expected input key is missing.

  • TypeError – If the input is not a sequence.

input_key = 'vectors'
name: str = 'pre_aggregator'
abstractmethod pre_aggregate(xs: Sequence[Any]) List[Any][source]

Transform a sequence of vectors into a list of pre-aggregated vectors.

Parameters:

xs (Sequence[Any]) – Input sequence of vectors. All vectors must have the same shape and backend. The sequence should be non-empty.

Returns:

List of transformed vectors. Each vector has the same shape and backend as inputs, but the list length may differ from the input length.

Return type:

List[Any]

Raises:

ValueError – If the input sequence is empty or invalid.

class byzpy.pre_aggregators.bucketing.Bucketing(bucket_size: int, *, feature_chunk_size: int = 8192, perm: Iterable[int] | None = None, rng: Random | None = None)[source]

Bases: PreAggregator

Bucketing pre-aggregator: group vectors into buckets and average.

This pre-aggregator randomly permutes input vectors, splits them into consecutive buckets of a specified size, and returns the mean of each bucket. This reduces the number of vectors while preserving some statistical properties.

Algorithm: 1. Randomly permute the input vectors (or use provided permutation). 2. Split into consecutive buckets of size bucket_size. 3. Return the mean of each bucket.

Parameters:
  • bucket_size (int) – Number of vectors per bucket. Must be >= 1. The last bucket may be smaller if the number of vectors is not divisible by bucket_size.

  • feature_chunk_size (int, optional) – Size of feature chunks for parallel processing. Default is 8192.

  • perm (Optional[Iterable[int]], optional) – Explicit permutation of indices. If None, a random permutation is generated. Must be a permutation of range(n) where n is the number of input vectors.

  • rng (Optional[random.Random], optional) – Random number generator for shuffling. If None, a new generator is created.

Examples

>>> preagg = Bucketing(bucket_size=4)
>>> vectors = [torch.randn(100) for _ in range(10)]
>>> result = preagg.pre_aggregate(vectors)
>>> len(result)  # 10 vectors -> ceil(10/4) = 3 buckets
3
>>> assert all(v.shape == (100,) for v in result)

Notes

  • Output length is ceil(n / bucket_size) where n is input length.

  • Supports parallel execution via subtasks for large feature dimensions.

  • Time complexity: O(n * d) where n is number of vectors and d is dimension. With subtasks: O(n * d / workers).

  • Memory complexity: O(n * d) for stacking vectors.

pre_aggregate(xs: Sequence[Any]) List[Any][source]

Transform a sequence of vectors into a list of pre-aggregated vectors.

Parameters:

xs (Sequence[Any]) – Input sequence of vectors. All vectors must have the same shape and backend. The sequence should be non-empty.

Returns:

List of transformed vectors. Each vector has the same shape and backend as inputs, but the list length may differ from the input length.

Return type:

List[Any]

Raises:

ValueError – If the input sequence is empty or invalid.

class byzpy.pre_aggregators.clipping.Clipping(threshold: float = 2.0, *, chunk_size: int = 32)[source]

Bases: PreAggregator

Static norm clipping pre-aggregator.

pre_aggregate(xs: Sequence[Any]) List[Any][source]

Transform a sequence of vectors into a list of pre-aggregated vectors.

Parameters:

xs (Sequence[Any]) – Input sequence of vectors. All vectors must have the same shape and backend. The sequence should be non-empty.

Returns:

List of transformed vectors. Each vector has the same shape and backend as inputs, but the list length may differ from the input length.

Return type:

List[Any]

Raises:

ValueError – If the input sequence is empty or invalid.

class byzpy.pre_aggregators.arc.ARC(f: int = 0, *, chunk_size: int = 32)[source]

Bases: PreAggregator

Adaptive Robust Clipping (ARC) pre-aggregator.

Clips the ⌊2 f / n * (n - f)⌋ largest-norm vectors to the norm of the next-largest remaining vector.

pre_aggregate(xs: Sequence[Any]) List[Any][source]

Transform a sequence of vectors into a list of pre-aggregated vectors.

Parameters:

xs (Sequence[Any]) – Input sequence of vectors. All vectors must have the same shape and backend. The sequence should be non-empty.

Returns:

List of transformed vectors. Each vector has the same shape and backend as inputs, but the list length may differ from the input length.

Return type:

List[Any]

Raises:

ValueError – If the input sequence is empty or invalid.

class byzpy.pre_aggregators.nnm.NearestNeighborMixing(f: int, *, feature_chunk_size: int = 8192)[source]

Bases: PreAggregator

NNM (Nearest-Neighbor Mixing):

For each i, replace x_i with the average of its k = n - f nearest neighbors (including itself) by Euclidean distance.

Constructor args:

f: int with 0 <= f < n (validated at call time since n is unknown at init)

Call:

pre_aggregate(xs) -> List[y_1, …, y_n] (same length as xs)

pre_aggregate(xs: Sequence[Any]) List[Any][source]

Transform a sequence of vectors into a list of pre-aggregated vectors.

Parameters:

xs (Sequence[Any]) – Input sequence of vectors. All vectors must have the same shape and backend. The sequence should be non-empty.

Returns:

List of transformed vectors. Each vector has the same shape and backend as inputs, but the list length may differ from the input length.

Return type:

List[Any]

Raises:

ValueError – If the input sequence is empty or invalid.

Training Pipelines (PS & P2P)

class byzpy.engine.parameter_server.ps.ParameterServer(honest_nodes: List[HonestNodeActor], byzantine_nodes: List[ByzantineNodeActor], aggregator: Aggregator, pre_aggregator: PreAggregator | None = None, update_byzantines: bool = False, *, actor_pool: ActorPool | None = None, scheduler_metadata: dict | None = None)[source]

Bases: object

Parameter server for synchronous distributed training.

This class orchestrates a parameter server training loop where: 1. All honest nodes compute gradients in parallel 2. Byzantine nodes generate malicious gradients (if present) 3. Gradients are optionally pre-aggregated 4. An aggregator combines all gradients into a single update 5. The aggregated gradient is sent back to all nodes

Parameters:
  • honest_nodes (List[HonestNodeActor]) – List of honest node actors that compute legitimate gradients.

  • byzantine_nodes (List[ByzantineNodeActor]) – List of Byzantine node actors that generate malicious gradients.

  • aggregator (Aggregator) – Aggregator instance to combine gradients (e.g., MultiKrum, Median).

  • pre_aggregator (Optional[PreAggregator], optional) – Optional pre-aggregator to transform gradients before aggregation (e.g., Bucketing, Clipping).

  • update_byzantines (bool, optional) – If True, Byzantine nodes also receive the aggregated gradient update. Default is False.

  • actor_pool (ActorPool | None, optional) – Optional actor pool for parallel aggregation. If None, aggregation runs on the main thread.

  • scheduler_metadata (Optional[dict], optional) – Additional metadata to pass to the graph scheduler.

Examples

>>> from byzpy.aggregators.geometric_wise.krum import MultiKrum
>>> aggregator = MultiKrum(f=2, q=5)
>>> ps = ParameterServer(
...     honest_nodes=honest_actors,
...     byzantine_nodes=byz_actors,
...     aggregator=aggregator
... )
>>> for _ in range(100):
...     await ps.round()
>>> await ps.shutdown()
async round() Tensor[source]

Execute one training round.

This method: 1. Collects gradients from all honest nodes 2. Generates malicious gradients from Byzantine nodes 3. Optionally applies pre-aggregation 4. Aggregates all gradients 5. Sends the aggregated gradient to all nodes

Returns:

The aggregated gradient vector.

Return type:

torch.Tensor

async shutdown()[source]
class byzpy.engine.peer_to_peer.topology.Edge(u: 'int', v: 'int')[source]

Bases: object

class byzpy.engine.peer_to_peer.topology.Topology(n_nodes: int, edges: Iterable[Tuple[int, int]])[source]

Bases: object

Directed communication graph V={0..N-1}. Store both Γ_out(i) and Γ_in(i).

class byzpy.engine.peer_to_peer.train.PeerToPeer(honest_nodes: List[HonestNodeActor], byzantine_nodes: List[ByzantineNodeActor] | None, topology: Topology, *, lr: float = 0.05, channel_name: str = 'p2p', context_factory: Callable[[str, int], NodeContext] | None = None)[source]

Bases: object

Peer-to-peer training orchestrator.

This class provides a high-level API for peer-to-peer distributed training where nodes communicate directly with their neighbors according to a topology (ring, complete graph, etc.). It wraps the DecentralizedPeerToPeer runner for backwards compatibility.

Parameters:
  • honest_nodes (List[HonestNodeActor]) – List of honest node actors participating in P2P training.

  • byzantine_nodes (Optional[List[ByzantineNodeActor]]) – Optional list of Byzantine node actors.

  • topology (Topology) – Communication topology defining which nodes can communicate.

  • lr (float, optional) – Learning rate for gradient updates. Default is 0.05.

  • channel_name (str, optional) – Channel name for communication (kept for API compatibility). Default is “p2p”.

  • context_factory (Optional[Callable[[str, int], NodeContext]], optional) – Factory function to create node contexts. If None, uses default context creation.

Examples

>>> from byzpy.engine.peer_to_peer.topology import Topology
>>> topology = Topology.ring(n=5, k=1)
>>> p2p = PeerToPeer(
...     honest_nodes=honest_actors,
...     byzantine_nodes=byz_actors,
...     topology=topology,
...     lr=0.01
... )
>>> await p2p.bootstrap()
>>> for _ in range(100):
...     await p2p.round()
>>> await p2p.shutdown()

Engine APIs

Node Applications

class byzpy.engine.node.application.ByzantineNodeApplication(*, name: str, actor_pool: ActorPool | Sequence[ActorPoolConfig], metadata: Mapping[str, Any] | None = None)[source]

Bases: NodeApplication

Application runtime for Byzantine nodes.

This is a convenience wrapper for Byzantine nodes that primarily execute attack pipelines. It reserves the “attack” pipeline name and provides the run_attack() helper method.

Examples

>>> app = ByzantineNodeApplication(
...     name="byz_node0",
...     actor_pool=[ActorPoolConfig(backend="thread", count=1)]
... )
>>> # Register attack pipeline...
>>> malicious_grad = await app.run_attack(inputs={"honest_grads": grads})
ATTACK_PIPELINE = 'attack'
async run_attack(*, inputs: Mapping[str, Any], metadata: Mapping[str, Any] | None = None) Any[source]
run_attack_sync(*, inputs: Mapping[str, Any], metadata: Mapping[str, Any] | None = None) Any[source]
class byzpy.engine.node.application.HonestNodeApplication(*, name: str, actor_pool: ActorPool | Sequence[ActorPoolConfig], metadata: Mapping[str, Any] | None = None)[source]

Bases: NodeApplication

Application runtime for honest nodes.

This is a convenience wrapper that reserves special pipeline names for honest node behaviors: - “aggregate”: Aggregation pipeline for combining gradients - “honest_gradient”: Pipeline for computing honest gradients

It provides helper methods like aggregate() and honest_gradient() that automatically use the registered pipelines.

Examples

>>> app = HonestNodeApplication(
...     name="honest_node0",
...     actor_pool=[ActorPoolConfig(backend="thread", count=2)]
... )
>>> # Register aggregation pipeline...
>>> result = await app.aggregate(gradients=[grad1, grad2, grad3])
AGGREGATION_PIPELINE = 'aggregate'
GRADIENT_PIPELINE = 'honest_gradient'
async aggregate(*, gradients: Sequence[Any], metadata: Mapping[str, Any] | None = None) Any[source]
aggregate_sync(*, gradients: Sequence[Any], metadata: Mapping[str, Any] | None = None) Any[source]
async honest_gradient(inputs: Mapping[str, Any], *, metadata: Mapping[str, Any] | None = None) Any[source]
honest_gradient_sync(inputs: Mapping[str, Any], *, metadata: Mapping[str, Any] | None = None) Any[source]
class byzpy.engine.node.application.NodeApplication(*, name: str, actor_pool: ActorPool | Sequence[ActorPoolConfig], metadata: Mapping[str, Any] | None = None)[source]

Bases: object

Application-layer runtime for a single node.

This class manages an actor pool, one or more computation graphs (“pipelines”), and provides helpers to run them through a NodeScheduler. It serves as the bridge between the application layer (aggregators, attacks, etc.) and the scheduling layer (graphs, actors).

A node application can register multiple pipelines (computation graphs) and execute them on demand. Each pipeline can use the shared actor pool for parallel execution.

Parameters:
  • name (str) – Unique name for this node application.

  • actor_pool (ActorPool | Sequence[ActorPoolConfig]) – Either an existing ActorPool instance or a sequence of ActorPoolConfig objects to create a new pool.

  • metadata (Optional[Mapping[str, Any]], optional) – Base metadata to include in all pipeline executions.

Examples

>>> from byzpy.engine.graph.pool import ActorPoolConfig
>>> app = NodeApplication(
...     name="node0",
...     actor_pool=[ActorPoolConfig(backend="thread", count=4)]
... )
>>> # Register and run pipelines...
has_pipeline(name: str) bool[source]
list_pipelines() Iterable[str][source]
property pool: ActorPool
register_pipeline(name: str, graph: ComputationGraph, *, metadata: Mapping[str, Any] | None = None) None[source]
async run_pipeline(name: str, inputs: Mapping[str, Any], *, metadata: Mapping[str, Any] | None = None) Dict[str, Any][source]
run_pipeline_sync(name: str, inputs: Mapping[str, Any], *, metadata: Mapping[str, Any] | None = None) Dict[str, Any][source]

Synchronous helper that runs run_pipeline by attaching to the current running loop when possible, or spinning up a temporary event loop.

async shutdown() None[source]
class byzpy.engine.node.application.NodePipeline(graph: ComputationGraph, metadata: Mapping[str, Any] | None = None)[source]

Bases: object

Declarative description of a computation pipeline bound to a node.

graph: ComputationGraph
metadata: Mapping[str, Any] | None = None

Graph Scheduling

class byzpy.engine.graph.scheduler.MessageAwareNodeScheduler(*args, **kwargs)[source]

Bases: NodeScheduler

Extended NodeScheduler with message-driven execution support.

This scheduler extends NodeScheduler to support message-driven computation graphs where nodes can wait for messages from other nodes before proceeding. This is essential for decentralized peer-to-peer training where nodes communicate asynchronously.

The scheduler maintains a message cache and waiters, allowing operators to use MessageTriggerOp to wait for specific message types.

Examples

>>> scheduler = MessageAwareNodeScheduler(graph, pool=pool)
>>> # In another task, deliver a message
>>> scheduler.deliver_message("gradient", payload=gradient_vector)
>>> # In graph execution, wait for message
>>> message = await scheduler.wait_for_message("gradient")
deliver_message(message_type: str, payload: Any) None[source]

Deliver a message, waking up any waiters.

This method delivers a message of the specified type. If there are any tasks waiting for this message type, they are immediately woken up. Otherwise, the message is cached for future waiters.

Parameters:
  • message_type (str) – Type identifier for the message.

  • payload (Any) – The message payload to deliver.

async run(inputs: Mapping[str, Any]) Dict[str, Any][source]

Override to handle message-driven inputs.

async wait_for_message(message_type: str, *, timeout: float | None = None) Any[source]

Wait for a specific message type to arrive.

This method blocks until a message of the specified type is delivered via deliver_message(). If a message is already cached, it returns immediately.

Parameters:
  • message_type (str) – Type identifier for the message to wait for.

  • timeout (Optional[float], optional) – Maximum time to wait in seconds. If None, waits indefinitely. Default is None.

Returns:

The message payload.

Return type:

Any

Raises:

asyncio.TimeoutError – If timeout is exceeded before message arrives.

class byzpy.engine.graph.scheduler.MessageSource(message_type: str, field: str | None = None, timeout: float | None = None)[source]

Bases: object

Represents a graph input that comes from a message.

class byzpy.engine.graph.scheduler.NodeScheduler(graph: ComputationGraph, *, pool: ActorPool | None = None, metadata: Mapping[str, Any] | None = None)[source]

Bases: object

Scheduler for executing computation graphs.

This class evaluates a computation graph by executing nodes in topological order. It dispatches operators to an actor pool for parallel execution when subtasks are supported.

Parameters:
  • graph (ComputationGraph) – The computation graph to execute.

  • pool (ActorPool | None, optional) – Optional actor pool for parallel operator execution. If None, operators run synchronously on the main thread.

  • metadata (Optional[Mapping[str, Any]], optional) – Metadata to include in operator contexts (e.g., node name, pool size).

Examples

>>> from byzpy.engine.graph.graph import ComputationGraph
>>> from byzpy.engine.graph.pool import ActorPool, ActorPoolConfig
>>> graph = make_single_operator_graph(...)
>>> pool = ActorPool([ActorPoolConfig(backend="thread", count=4)])
>>> scheduler = NodeScheduler(graph, pool=pool)
>>> results = await scheduler.run({"gradients": gradients})
async run(inputs: Mapping[str, Any]) Dict[str, Any][source]

Actor Pool

class byzpy.engine.graph.pool.ActorPool(configs: Sequence[ActorPoolConfig])[source]

Bases: object

Pool of actor workers for parallel task execution.

This class manages a collection of actor workers from one or more backend configurations. It schedules subtasks across workers based on task affinity and worker capabilities, enabling efficient parallel execution of computation graph operators.

The pool supports heterogeneous workers (e.g., mix of CPU and GPU workers) and automatically routes tasks to appropriate workers based on their declared capabilities.

Parameters:

configs (Sequence[ActorPoolConfig]) – Sequence of pool configurations. Each config creates a group of workers with the same backend and capabilities.

Examples

>>> from byzpy.engine.graph.pool import ActorPool, ActorPoolConfig
>>> configs = [
...     ActorPoolConfig(backend="thread", count=4),
...     ActorPoolConfig(backend="gpu", count=2)
... ]
>>> pool = ActorPool(configs)
>>> await pool.start()
>>> # Use pool with NodeScheduler...
>>> await pool.shutdown()

Notes

  • Workers are lazily started when start() is called.

  • Tasks are scheduled based on SubTask affinity hints.

  • The pool supports channel-based communication between workers.

async open_channel(name: str) ActorPoolChannel[source]

Bind a channel with the given name on every worker in the pool and return a helper that exposes per-worker send/receive helpers. Reuses existing bindings when called multiple times for the same name.

async run_many(subtasks: Sequence[SubTask]) List[Any][source]
async run_subtask(subtask: SubTask) Any[source]
async shutdown() None[source]
property size: int
async start() None[source]
worker_affinities() Sequence[str][source]
class byzpy.engine.graph.pool.ActorPoolChannel(*, name: str, channels: Mapping[str, ChannelRef], endpoints: Mapping[str, Endpoint])[source]

Bases: object

Wrapper around a channel bound on each worker in an ActorPool. Provides convenient send/recv helpers keyed by worker name.

channel(worker: str) ChannelRef[source]
endpoint(worker: str) Endpoint[source]
async recv(worker: str, *, timeout: float | None = None) Any[source]
async send(sender: str, recipient: str, payload: Any) None[source]
property workers: Sequence[str]
class byzpy.engine.graph.pool.ActorPoolConfig(backend: str | ActorBackend, count: int = 1, capabilities: Sequence[str] | None = None, name: str | None = None)[source]

Bases: object

Configuration for a pool of actors of the same type.

This dataclass specifies how to create a group of actor workers with the same backend and capabilities.

Parameters:
  • backend (Union[str, ActorBackend]) – Backend specification. Can be a string (“thread”, “process”, “gpu”, “tcp://host:port”, “ucx://host:port”) or an ActorBackend instance.

  • count (int, optional) – Number of workers to create. Default is 1.

  • capabilities (Sequence[str] | None, optional) – Explicit capability tags for these workers. If None, capabilities are inferred from the backend. Default is None.

  • name (Optional[str], optional) – Optional name prefix for worker identification. Default is None.

Examples

>>> config = ActorPoolConfig(backend="thread", count=4, name="cpu-workers")
>>> pool = ActorPool([config])
>>> await pool.start()
backend: str | ActorBackend
capabilities: Sequence[str] | None = None
count: int = 1
name: str | None = None
resolved_capabilities() Sequence[str][source]

Actor Factory

Shared helpers for turning user specs into actor backend instances.

byzpy.engine.actor.factory.resolve_backend(spec: str | ActorBackend) ActorBackend[source]

Resolve an actor backend from a specification string or instance.

This function converts backend specification strings into ActorBackend instances. It supports various backend types including threads, processes, GPUs, and remote actors.

Parameters:

spec (Union[str, ActorBackend]) – Backend specification. Can be: - “thread”: Thread-based actors in the current process - “process”: Process-based actors in separate processes - “gpu”: GPU-based actors for CUDA execution - “tcp://host:port”: Remote actors via TCP - “ucx://host:port”: Remote actors via UCX (for GPU clusters) - An existing ActorBackend instance (returned as-is)

Returns:

The resolved actor backend instance.

Return type:

ActorBackend

Raises:

ValueError – If the specification string is not recognized.

Examples

>>> backend = resolve_backend("thread")
>>> isinstance(backend, ThreadActorBackend)
True
>>> backend = resolve_backend("tcp://localhost:29000")
>>> isinstance(backend, RemoteActorBackend)
True