Source code for continuum.frontend.optimizer

import random


[docs] class TensorOpt: """Local optimizer for numeric tensor-like scalar parameters.""" def __init__(self, lr: float, rng: random.Random): self.lr = float(lr) self.rng = rng
[docs] def update(self, p, evaluate): if not isinstance(p.value, (int, float)): return False current = float(p.value) best = evaluate() candidates = [current + self.lr, current - self.lr] for c in candidates: p.value = c score = evaluate() if score > best: best = score current = c p.value = current return True
[docs] class TextGradOpt: """Text mutation optimizer used for prompt-style parameters.""" def __init__(self, lr: float, rng: random.Random): self.lr = float(lr) self.rng = rng def _mutate(self, text, tokens): words = text.split() if not words: words = [self.rng.choice(tokens)] return " ".join(words) do_add = self.rng.random() < 0.5 or len(words) == 1 if do_add: idx = self.rng.randrange(0, len(words) + 1) tok = self.rng.choice(tokens) words.insert(idx, tok) else: idx = self.rng.randrange(0, len(words)) words.pop(idx) return " ".join(words).strip()
[docs] def update(self, p, evaluate): current = str(p.value) best_text = current best_score = evaluate() token_pool = p.metadata.get("tokens") if not token_pool: token_pool = [w for w in current.split() if w] if not token_pool: token_pool = ["good", "better", "best"] trials = max(1, int(p.metadata.get("trials", max(1, self.lr * 4)))) for _ in range(trials): candidate = self._mutate(best_text, token_pool) p.value = candidate score = evaluate() if score > best_score: best_score = score best_text = candidate p.value = best_text return True
[docs] class GEPAOpt: """Discrete choice optimizer for categorical parameters.""" def __init__(self, rng: random.Random): self.rng = rng
[docs] def update(self, p, evaluate): choices = p.metadata.get("choices") if not choices: return False best_value = p.value best_score = evaluate() for candidate in choices: p.value = candidate score = evaluate() if score > best_score: best_score = score best_value = candidate p.value = best_value return True
[docs] class BayesOpt: """Simple random-search optimizer over bounded continuous values.""" def __init__(self, rng: random.Random): self.rng = rng
[docs] def update(self, p, evaluate): if not isinstance(p.value, (int, float)): return False current = float(p.value) low = float(p.metadata.get("min", current - 1.0)) high = float(p.metadata.get("max", current + 1.0)) trials = int(p.metadata.get("trials", 8)) best_value = current best_score = evaluate() for _ in range(max(1, trials)): candidate = self.rng.uniform(low, high) p.value = candidate score = evaluate() if score > best_score: best_score = score best_value = candidate p.value = best_value return True
[docs] class Optimizer: """Unified optimizer facade for all Continuum parameter kinds.""" def __init__(self, program, metric, *, lr_tensor=1e-1, lr_text=1.0, seed=0): self.program = program self.metric = metric self.rng = random.Random(seed) self._tensor = TensorOpt(lr=lr_tensor, rng=self.rng) self._text = TextGradOpt(lr=lr_text, rng=self.rng) self._discrete = GEPAOpt(rng=self.rng) self._continuous = BayesOpt(rng=self.rng) def _score(self, batch): scores = [self.metric(self.program(x), x) for x in batch] if not scores: return 0.0 return float(sum(scores) / len(scores))
[docs] def step(self, batch): params = list(self.program.parameters()) if hasattr(self.program, "parameters") else [] for p in params: def evaluate(): return self._score(batch) if p.kind == "tensor": self._tensor.update(p, evaluate) elif p.kind == "text": self._text.update(p, evaluate) elif p.kind == "discrete": self._discrete.update(p, evaluate) elif p.kind == "continuous": self._continuous.update(p, evaluate)
[docs] def fit(self, dataset, epochs=1): for _ in range(epochs): for batch in dataset.batches(): self.step(batch)