Coverage for src/debputy/plugin/plugin_state.py: 57%

62 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import contextvars 

2import functools 

3import inspect 

4from contextvars import ContextVar 

5from typing import Optional, ParamSpec, TypeVar, NoReturn, Union 

6from collections.abc import Callable 

7 

8from debputy.exceptions import ( 

9 UnhandledOrUnexpectedErrorFromPluginError, 

10 DebputyRuntimeError, 

11) 

12from debputy.util import _trace_log, _is_trace_log_enabled 

13 

14_current_debputy_plugin_cxt_var: ContextVar[str | None] = ContextVar( 

15 "current_debputy_plugin", 

16 default=None, 

17) 

18 

19P = ParamSpec("P") 

20R = TypeVar("R") 

21 

22 

23def current_debputy_plugin_if_present() -> str | None: 

24 return _current_debputy_plugin_cxt_var.get() 

25 

26 

27def current_debputy_plugin_required() -> str: 

28 v = current_debputy_plugin_if_present() 

29 if v is None: 

30 raise AssertionError( 

31 "current_debputy_plugin_required() was called, but no plugin was set." 

32 ) 

33 return v 

34 

35 

36def wrap_plugin_code( 

37 plugin_name: str, 

38 func: Callable[P, R], 

39 *, 

40 non_debputy_exception_handling: bool | Callable[[Exception], NoReturn] = True, 

41) -> Callable[P, R]: 

42 if isinstance(non_debputy_exception_handling, bool): 42 ↛ 54line 42 didn't jump to line 54 because the condition on line 42 was always true

43 

44 runner = run_in_context_of_plugin 

45 if non_debputy_exception_handling: 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true

46 runner = run_in_context_of_plugin_wrap_errors 

47 

48 def _plugin_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 

49 return runner(plugin_name, func, *args, **kwargs) 

50 

51 functools.update_wrapper(_plugin_wrapper, func) 

52 return _plugin_wrapper 

53 

54 def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 

55 try: 

56 return run_in_context_of_plugin(plugin_name, func, *args, **kwargs) 

57 except DebputyRuntimeError: 

58 raise 

59 except Exception as e: 

60 non_debputy_exception_handling(e) 

61 

62 functools.update_wrapper(_wrapper, func) 

63 return _wrapper 

64 

65 

66def run_in_context_of_plugin( 

67 plugin: str, 

68 func: Callable[P, R], 

69 *args: P.args, 

70 **kwargs: P.kwargs, 

71) -> R: 

72 context = contextvars.copy_context() 

73 if _is_trace_log_enabled(): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 call_stack = inspect.stack() 

75 caller: str = "[N/A]" 

76 for frame in call_stack: 

77 if frame.filename != __file__: 

78 try: 

79 fname = frame.frame.f_code.co_qualname 

80 except AttributeError: 

81 fname = None 

82 if fname is None: 

83 fname = frame.function 

84 caller = f"{frame.filename}:{frame.lineno} ({fname})" 

85 break 

86 # Do not keep the reference longer than necessary 

87 del call_stack 

88 _trace_log( 

89 f"Switching plugin context to {plugin} at {caller} (from context: {current_debputy_plugin_if_present()})" 

90 ) 

91 # Wish we could just do a regular set without wrapping it in `context.run` 

92 context.run(_current_debputy_plugin_cxt_var.set, plugin) 

93 return context.run(func, *args, **kwargs) 

94 

95 

96def run_in_context_of_plugin_wrap_errors( 

97 plugin: str, 

98 func: Callable[P, R], 

99 *args: P.args, 

100 **kwargs: P.kwargs, 

101) -> R: 

102 try: 

103 return run_in_context_of_plugin(plugin, func, *args, **kwargs) 

104 except DebputyRuntimeError: 

105 raise 

106 except Exception as e: 

107 if plugin != "debputy": 

108 raise UnhandledOrUnexpectedErrorFromPluginError( 

109 f"{func.__qualname__} from the plugin {plugin} raised exception that was not expected here." 

110 ) from e 

111 else: 

112 raise AssertionError( 

113 "Bug in the `debputy` plugin: Unhandled exception." 

114 ) from e