Last active
February 6, 2026 20:27
-
-
Save mahdilamb/b404ecc37f50681746ea2bc8f0e2ee00 to your computer and use it in GitHub Desktop.
Numpy implementation of Opensearch Age decay
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.13" | |
| # dependencies = [ | |
| # "matplotlib>=3.10.8", | |
| # "numpy>=2.4.2", | |
| # ] | |
| # | |
| # /// | |
| import datetime | |
| import math | |
| from types import MappingProxyType | |
| from typing import Any, Mapping | |
| import numpy as np | |
| import ast | |
| SECONDS_IN_DAY = 24 * 3600 | |
| RANKING_EXPRESSION_FUNCTIONS = MappingProxyType( | |
| { | |
| "log": np.log, | |
| "exp": np.exp, | |
| } | |
| ) | |
| def today_timestamp() -> int: | |
| return int( | |
| datetime.datetime.combine( | |
| datetime.date.today(), datetime.datetime.min.time() | |
| ).timestamp() | |
| ) | |
| NOW = today_timestamp() | |
| def linear_decay( | |
| x: np.ndarray, origin: int, scale: int, offset: int = 0, decay: float = 0.5 | |
| ): | |
| x = (x - NOW) / SECONDS_IN_DAY | |
| distance = np.maximum(0, np.abs(x - origin) - (offset)) | |
| scaling = scale / (1.0 - decay) | |
| return np.maximum(0.0, (scaling - distance) / scaling) | |
| def exp_decay( | |
| x: np.ndarray, origin: int, scale: int, offset: int = 0, decay: float = 0.5 | |
| ): | |
| x = (x - NOW) / SECONDS_IN_DAY | |
| distance = np.maximum(0, np.abs(x - origin) - (offset)) | |
| scaling = np.log(decay) / scale | |
| return np.exp(scaling * distance) | |
| def gauss_decay( | |
| x: np.ndarray, origin: int, scale: int, offset: int = 0, decay: float = 0.5 | |
| ): | |
| x = (x - NOW) / SECONDS_IN_DAY | |
| distance = np.maximum(0, np.abs(x - origin) - (offset)) | |
| scaling = 0.5 * np.pow(scale, 2.0) / np.log(decay) | |
| return np.exp(0.5 * np.pow(distance, 2.0) / scaling) | |
| def safe_eval( | |
| expr: str, | |
| *, | |
| allowed_functions: Mapping[str, Any] = RANKING_EXPRESSION_FUNCTIONS, | |
| **data: Any, | |
| ): | |
| def _eval(node): | |
| if isinstance(node, np.ndarray): | |
| return node | |
| if isinstance(node, ast.Expression): | |
| return _eval(node.body) | |
| if isinstance(node, ast.Constant): | |
| return node.value | |
| if isinstance(node, ast.BinOp): | |
| if isinstance(node.op, ast.Add): | |
| return _eval(node.left) + _eval(node.right) | |
| if isinstance(node.op, ast.Mult): | |
| return _eval(node.left) * _eval(node.right) | |
| raise ValueError("Only + and * are allowed") | |
| if isinstance(node, ast.UnaryOp): | |
| if isinstance(node.op, ast.UAdd): | |
| return +_eval(node.operand) | |
| if isinstance(node.op, ast.USub): | |
| return -_eval(node.operand) | |
| if isinstance(node, ast.Call): | |
| if not isinstance(node.func, ast.Name): | |
| raise ValueError("Invalid function call") | |
| if node.func.id not in allowed_functions: | |
| raise ValueError(f"Only {tuple(allowed_functions.keys())} are allowed") | |
| return allowed_functions[node.func.id](*[_eval(arg) for arg in node.args]) | |
| if isinstance(node, (ast.Name)): | |
| return _eval(data[node.id]) | |
| raise ValueError(f"Disallowed expression: {type(node).__name__}") | |
| tree = ast.parse(expr, mode="eval") | |
| for node in ast.walk(tree): | |
| if not isinstance( | |
| node, | |
| ( | |
| ast.Expression, | |
| ast.BinOp, | |
| ast.Add, | |
| ast.Mult, | |
| ast.Call, | |
| ast.Name, | |
| ast.Constant, | |
| ast.Load, | |
| ast.UnaryOp, | |
| ast.USub, | |
| ast.UAdd, | |
| ), | |
| ): | |
| raise ValueError(f"Disallowed syntax: {type(node).__name__}") | |
| return _eval(tree) | |
| def eval_ranking_expression(d: np.ndarray, formula: str, **functions): | |
| return safe_eval(formula, d=d, allowed_functions=functions) | |
| def approx_gaussian_decay( | |
| *, | |
| scale: int, | |
| origin: int | None = None, | |
| offset: int = 0, | |
| decay: float = 0.5, | |
| field: str | None = None, | |
| ) -> str: | |
| if field is None: | |
| if origin is not None: | |
| raise ValueError("Cannot have a custom origin if using document_age") | |
| document_age_in_days = ( | |
| f"(document_age*{1 / 24})" # Convert document age to days | |
| ) | |
| else: | |
| now = today_timestamp() | |
| seconds_to_days = 1 / SECONDS_IN_DAY | |
| document_age_in_days = f"(({field} + -{now})*{seconds_to_days})" | |
| distance = f"(({document_age_in_days} + -{origin}) + -{offset})" | |
| scaling = 0.5 * math.pow(scale, 2.0) / math.log(decay) | |
| return f"exp(0.5 * ({distance} * {distance}) * {1 / scaling})" | |
| def full_gaussian_decay( | |
| *, | |
| scale: int, | |
| origin: int | None = None, | |
| offset: int = 0, | |
| decay: float = 0.5, | |
| field: str | None = None, | |
| ) -> str: | |
| if field is None: | |
| if origin is not None: | |
| raise ValueError("Cannot have a custom origin if using document_age") | |
| document_age_in_days = ( | |
| f"(document_age*{1 / 24})" # Convert document age to days | |
| ) | |
| else: | |
| now = today_timestamp() | |
| seconds_to_days = 1 / SECONDS_IN_DAY | |
| document_age_in_days = f"(({field} + -{now})*{seconds_to_days})" | |
| distance = f"max(0,(abs({document_age_in_days} + -{origin}) + -{offset}))" | |
| scaling = 0.5 * math.pow(scale, 2.0) / math.log(decay) | |
| return f"exp(0.5 * ({distance} * {distance}) * {1 / scaling})" | |
| if __name__ == "__main__": | |
| import matplotlib.pyplot as plt | |
| origin = 0 | |
| offset = 4 | |
| scale = 2 | |
| decay = 0.9 | |
| date_span = 20 | |
| d = np.linspace( | |
| ((origin - date_span) * SECONDS_IN_DAY) + NOW, | |
| ((origin + date_span) * SECONDS_IN_DAY) + NOW, | |
| (date_span * 2) + 1, | |
| dtype=np.int64, | |
| ) | |
| lin = linear_decay(d, origin=origin, offset=offset, scale=scale, decay=decay) | |
| exp = exp_decay(d, origin=origin, offset=offset, scale=scale, decay=decay) | |
| gau = gauss_decay(d, origin=origin, offset=offset, scale=scale, decay=decay) | |
| plt.figure(figsize=(8, 5)) | |
| plt.plot(d, gau, label="Gaussian", linewidth=2, color="orange") | |
| plt.plot(d, exp, label="Exponential", linewidth=2, color="blue") | |
| plt.plot(d, lin, label="Linear", linewidth=2, color="green") | |
| plt.plot( | |
| d, | |
| eval_ranking_expression( | |
| d, | |
| approx_gaussian_decay( | |
| scale=scale, origin=origin, decay=decay, offset=offset, field="d" | |
| ), | |
| **RANKING_EXPRESSION_FUNCTIONS, | |
| ), | |
| label="Custom", | |
| linewidth=2, | |
| color="red", | |
| ) | |
| plt.plot( | |
| d, | |
| eval_ranking_expression( | |
| d, | |
| full_gaussian_decay( | |
| scale=scale, origin=origin, decay=decay, offset=offset, field="d" | |
| ), | |
| **{"exp": np.exp, "log": np.log, "max": np.maximum, "abs": np.abs}, | |
| ), | |
| label="Custom gaussian", | |
| linewidth=2, | |
| color="cyan", | |
| linestyle="dashed", | |
| ) | |
| plt.xlim((NOW, d.max())) | |
| plt.legend() | |
| plt.grid(True) | |
| plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment