Skip to content

Reference Book

Database

This is the Database class. It holds all data related to a single disease. It also holds an emulator which can be used to create results for scenarios that are not stored.

Source code in src/tgftools/database.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Database:
    """This is the Database class. It holds all data related to a single disease. It also holds an emulator which
    can be used to create results for scenarios that are not stored."""

    def __init__(
        self,
        gp: Optional[Gp] = None,
        partner_data: Optional[PartnerData] = None,
        pf_input_data: Optional[PFInputData] = None,
        model_results: Optional[ModelResults] = None,
    ):
        self.gp = gp
        self.partner_data = partner_data
        self.pf_input_data = pf_input_data
        self.model_results = model_results
        self.disease_name = model_results.disease_name

        # Check that all these filehandlers have the same disease_name (if they are defined)
        disease_name_where_args_defined = np.array([x.disease_name for x in (gp, partner_data, pf_input_data, model_results) if x is not None])
        assert (disease_name_where_args_defined == disease_name_where_args_defined[0]).all()

    def get_country(
        self,
        country: str,
        scenario_descriptor: str,
        funding_fraction: float,
        indicator: str,
    ) -> pd.DataFrame:
        """
        Data for a particular country, scenario_descriptor, funding_fraction and indicator.

        Args:
            country: The country (ISO3 code)
            scenario_descriptor: The scenario descriptor (e.g. 'default')
            funding_fraction: The funding fraction (e.g. 0.9)
            indicator: The indicator (e.g.'cases')

        Returns:
            Dataframe that assembles the information from all sources for a particular country, for a particular
             scenario, funding_fraction and indicator (If the indicator is not found within the
             pf_input_date or partner_data, then NaN's are used instead.
        """
        _model = self.model_results.df.loc[
            (scenario_descriptor, funding_fraction, country, slice(None), indicator)
        ].add_prefix("model_")

        try:
            _pf = self.pf_input_data.df.loc[
                (scenario_descriptor, country, slice(None), indicator)
            ].add_prefix("pf_")
        except KeyError:
            _pf = pd.DataFrame(
                index=_model.index,
                columns=["pf_" + c for c in ("low", "central", "high")],
                data=float("nan"),
            )

        try:
            _partner = self.partner_data.df.loc[
                (scenario_descriptor, country, slice(None), indicator)
            ].add_prefix("partner_")
        except KeyError:
            _partner = pd.DataFrame(
                index=_model.index,
                columns=["partner_" + c for c in ("low", "central", "high")],
                data=float("nan"),
            )

        return pd.concat([_model, _pf, _partner], axis=1).sort_index()

get_country(country, scenario_descriptor, funding_fraction, indicator)

Data for a particular country, scenario_descriptor, funding_fraction and indicator.

Parameters:

Name Type Description Default
country str

The country (ISO3 code)

required
scenario_descriptor str

The scenario descriptor (e.g. 'default')

required
funding_fraction float

The funding fraction (e.g. 0.9)

required
indicator str

The indicator (e.g.'cases')

required

Returns:

Type Description
DataFrame

Dataframe that assembles the information from all sources for a particular country, for a particular scenario, funding_fraction and indicator (If the indicator is not found within the pf_input_date or partner_data, then NaN's are used instead.

Source code in src/tgftools/database.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_country(
    self,
    country: str,
    scenario_descriptor: str,
    funding_fraction: float,
    indicator: str,
) -> pd.DataFrame:
    """
    Data for a particular country, scenario_descriptor, funding_fraction and indicator.

    Args:
        country: The country (ISO3 code)
        scenario_descriptor: The scenario descriptor (e.g. 'default')
        funding_fraction: The funding fraction (e.g. 0.9)
        indicator: The indicator (e.g.'cases')

    Returns:
        Dataframe that assembles the information from all sources for a particular country, for a particular
         scenario, funding_fraction and indicator (If the indicator is not found within the
         pf_input_date or partner_data, then NaN's are used instead.
    """
    _model = self.model_results.df.loc[
        (scenario_descriptor, funding_fraction, country, slice(None), indicator)
    ].add_prefix("model_")

    try:
        _pf = self.pf_input_data.df.loc[
            (scenario_descriptor, country, slice(None), indicator)
        ].add_prefix("pf_")
    except KeyError:
        _pf = pd.DataFrame(
            index=_model.index,
            columns=["pf_" + c for c in ("low", "central", "high")],
            data=float("nan"),
        )

    try:
        _partner = self.partner_data.df.loc[
            (scenario_descriptor, country, slice(None), indicator)
        ].add_prefix("partner_")
    except KeyError:
        _partner = pd.DataFrame(
            index=_model.index,
            columns=["partner_" + c for c in ("low", "central", "high")],
            data=float("nan"),
        )

    return pd.concat([_model, _pf, _partner], axis=1).sort_index()

CheckReport dataclass

DataClass for report to be saved from having run a single check

Source code in src/tgftools/checks.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@dataclass
class CheckReport:
    """DataClass for report to be saved from having run a single check"""

    name: str = None
    description: str = None
    is_critical: bool = False
    passes: bool = False
    message: [
        str,
        matplotlib.figure.Figure,
        list[str],
        list[matplotlib.figure.Figure],
    ] = None

CheckResult dataclass

DataClass for result of a single check

Source code in src/tgftools/checks.py
33
34
35
36
37
38
39
40
41
42
43
@dataclass
class CheckResult:
    """DataClass for result of a single check"""

    passes: bool = None
    message: [
        str,
        matplotlib.figure.Figure,
        list[str],
        list[matplotlib.figure.Figure],
    ] = None

ConsolidatedChecksReport

This class is used to capture the reports from individual checks and compile them into a consolidated report, which is printed to the console and written to a pdf.

Source code in src/tgftools/checks.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class ConsolidatedChecksReport:
    """This class is used to capture the reports from individual checks and compile them into a consolidated report,
    which is printed to the console and written to a pdf."""

    def __init__(self, title: str, doc: str, filenames: Dict):
        self._title = title
        self._doc = doc
        self._filenames = filenames
        self._check_reports = list()

        # Create empty list for the "flowables" for the pdf generation
        self.flowables = []

        # Load components for pdf generation
        self.styles = getSampleStyleSheet()
        self.spacer = Spacer(1, 0.25 * inch)
        self.small_spacer = Spacer(1, 0.1 * inch)
        self.horizontal_line = HRFlowable()

    def add_check_report(self, ch_rep: CheckReport = None):
        """Add the result of a check"""

        # if the message is an empty list, replace it with it None
        if isinstance(ch_rep.message, list) and len(ch_rep.message) == 0:
            ch_rep.message = None

        # Check that the message is of the right type (or none)
        if ch_rep.message is not None:
            item_types = (pd.DataFrame, plt.Figure, str, tuple)
            single_element = (
                ch_rep.message[0]
                if isinstance(ch_rep.message, list)
                else ch_rep.message
            )
            assert isinstance(single_element, item_types), (
                f"Message is of the wrong type {ch_rep.message=}, "
                f"{single_element=}, type: {type(single_element)}"
            )

        # Add to internal storage list of CheckReports
        self._check_reports.append(ch_rep)

    @property
    def passing_checks(self) -> List:
        return [rep for rep in self._check_reports if rep.passes]

    @property
    def non_critical_failing_checks(self) -> List:
        return [
            rep for rep in self._check_reports if not rep.passes and not rep.is_critical
        ]

    @property
    def critical_failing_checks(self) -> List:
        return [
            rep for rep in self._check_reports if not rep.passes and rep.is_critical
        ]

    @property
    def any_fails(self) -> Bool:
        return any(self.critical_failing_checks) or any(
            self.non_critical_failing_checks
        )

    def _print(self, item, style=None, echo_to_console=True) -> None:
        """Print to console and add into a 'flowables' list for pdf generation."""

        if style is None:
            style = self.styles["Normal"]

        def handle_item(this_item):
            if isinstance(this_item, str):
                if this_item == "\n":
                    # Handle blank line command
                    if echo_to_console:
                        print("\n")
                    self.flowables.append(self.spacer)

                elif this_item == "---":
                    # Insert horizontal line
                    if echo_to_console:
                        print("--------------------------------------------------")
                    self.flowables.append(self.horizontal_line)

                elif this_item.startswith("ICON="):
                    # Insert icon indicated
                    icon_file = Path(
                        get_root_path()
                        / "resources"
                        / "icons"
                        / this_item.split("ICON=")[1]
                    )
                    self.flowables.append(Image(icon_file, 50, 50))

                else:
                    # Handle simple string
                    if this_item != "":
                        if echo_to_console:
                            print(this_item)
                        self.flowables.append(Paragraph(deEmojify(this_item), style))

            elif isinstance(this_item, plt.Figure):
                if echo_to_console:
                    this_item.show()
                self.flowables.append(fig2image(this_item))

            elif isinstance(this_item, pd.DataFrame):
                if echo_to_console:
                    print(this_item.head())
                self.flowables.append(df2table(this_item))

            else:
                # item type not recognised: ignore
                pass

        if item is None:
            # If the item is None, then do nothing
            return

        elif isinstance(item, list):
            # If the item is actually a list of items, handle each item in turn
            for i in item:
                handle_item(i)
                self.flowables.append(self.small_spacer)
        else:
            # If the item is a single item, just handle it.
            handle_item(item)

    def _generate_report(self, verbose):
        """Generate the content of a report (for console and pdf)."""
        self.flowables = []
        self.flowables.append(
            Image(get_root_path() / "resources/icons/logo.jpg", 100, 50)
        )
        self._print(f"\n")
        self._print(self._title, style=self.styles["Heading1"])
        self._print(self._doc, style=self.styles["Normal"])
        self._print(f"\n")
        self._print(current_date_and_time_as_string(), style=self.styles["Heading3"])


        self._print(f"\n")
        self._print("Files Used:", style=self.styles["Heading3"])
        for k, v in self._filenames.items():
            self._print(f"* {k}: {v}", style=self.styles["Normal"])
        self._print(f"\n")
        self._print(f"Git Commit: {get_commit_revision_number()}")
        self._print("---")

        # Determine summary outcome of the set of checks
        self._print(f"\n")
        if any(self.critical_failing_checks):
            self._print(
                "❌Some checks have failed, including some that are CRITICAL.",
                style=self.styles["Heading2"],
            )
            self._print("ICON=cross.jpg")
        elif any(self.non_critical_failing_checks):
            self._print(
                "🤷Some checks have failed, but none are CRITICAL.",
                style=self.styles["Heading2"],
            )
            self._print("ICON=shrug.jpg")
        else:
            self._print("✅All checks passed.", style=self.styles["Heading2"])
            self._print("ICON=tick.jpg")
        self._print(f"\n")
        self._print("---")

        # Print details of each check to the console
        if any(self.critical_failing_checks):
            self._print(f"\n")
            self._print("🚨CRITICAL FAILING CHECKS", style=self.styles["Heading2"])
            for f in self.critical_failing_checks:
                self._print(f"    👎FAILED: {f.name}", style=self.styles["Heading3"])
                self._print(f"({f.description})")
                self._print(
                    f.message, style=self.styles["Normal"], echo_to_console=verbose
                )
                self._print(f"\n", echo_to_console=verbose)
            self._print("---")

        if any(self.non_critical_failing_checks):
            self._print(f"\n")
            self._print(f"\n")
            self._print("🤔 NON-CRITICAL FAILING CHECKS", style=self.styles["Heading2"])
            for f in self.non_critical_failing_checks:
                self._print(f"    👎FAILED: {f.name}", style=self.styles["Heading3"])
                self._print(f"({f.description})")
                self._print(
                    f.message, style=self.styles["Normal"], echo_to_console=verbose
                )
                self._print("\n", echo_to_console=verbose)
            self._print("---")

        if any(self.passing_checks):
            self._print(f"\n")
            self._print(f"\n")
            self._print("❤️ PASSING CHECKS", style=self.styles["Heading2"])
            for f in self.passing_checks:
                self._print(f"    👍PASSED: {f.name}", style=self.styles["Heading3"])
                self._print(f"({f.description})")
                self._print(
                    f.message, style=self.styles["Normal"], echo_to_console=verbose
                )
                self._print("\n", echo_to_console=verbose)
            self._print("---")

    def report(self, filename: Optional[Path], verbose: bool):
        """Print a consolidated report the console. If `verbose=True` then details of all the failures are provided."""
        self._generate_report(verbose=verbose)

        if filename is not None:
            print(f"Writing to pdf at: {filename}: ....", end="")
            doc = SimpleDocTemplate(str(filename), pageSize=reportlab.lib.pagesizes.A4)
            doc.build(self.flowables)
            print("Done!")

add_check_report(ch_rep=None)

Add the result of a check

Source code in src/tgftools/checks.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def add_check_report(self, ch_rep: CheckReport = None):
    """Add the result of a check"""

    # if the message is an empty list, replace it with it None
    if isinstance(ch_rep.message, list) and len(ch_rep.message) == 0:
        ch_rep.message = None

    # Check that the message is of the right type (or none)
    if ch_rep.message is not None:
        item_types = (pd.DataFrame, plt.Figure, str, tuple)
        single_element = (
            ch_rep.message[0]
            if isinstance(ch_rep.message, list)
            else ch_rep.message
        )
        assert isinstance(single_element, item_types), (
            f"Message is of the wrong type {ch_rep.message=}, "
            f"{single_element=}, type: {type(single_element)}"
        )

    # Add to internal storage list of CheckReports
    self._check_reports.append(ch_rep)

report(filename, verbose)

Print a consolidated report the console. If verbose=True then details of all the failures are provided.

Source code in src/tgftools/checks.py
403
404
405
406
407
408
409
410
411
def report(self, filename: Optional[Path], verbose: bool):
    """Print a consolidated report the console. If `verbose=True` then details of all the failures are provided."""
    self._generate_report(verbose=verbose)

    if filename is not None:
        print(f"Writing to pdf at: {filename}: ....", end="")
        doc = SimpleDocTemplate(str(filename), pageSize=reportlab.lib.pagesizes.A4)
        doc.build(self.flowables)
        print("Done!")

DatabaseChecks

This is the base class for the DatabaseChecks. The functions defined in the base class do the "behind the scenes" things needed to make the inherited class work.

Each function defined in the inheriting class is a check to be performed on the Database. The name of the function should be informative and the docstring should explain exactly what is being tested. In the check, assert is used to indicate what must be True for the check to pass; and an error message is provided giving information if it does not. The @critical decorator is used to label certain of the checks as being 'critical'. A different overall message is given according to whether there are any failure of 'critical' checks or only failures of 'non-critical' checks.

Source code in src/tgftools/checks.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class DatabaseChecks:
    """This is the base class for the DatabaseChecks.
    The functions defined in the base class do the "behind the scenes" things needed to make the inherited class work.

    Each function defined in the inheriting class is a check to be performed on the Database. The name of the function
     should be informative and the docstring should explain exactly what is being tested. In the check, `assert` is
     used to indicate what must be True for the check to pass; and an error message is provided giving information
     if it does not. The `@critical` decorator is used to label certain of the checks as being 'critical'. A different
     overall message is given according to whether there are any failure of 'critical' checks or only failures of
     'non-critical' checks.
    """

    def __init__(self, db: Database, parameters: Optional[Parameters] = None):
        self.db = db
        self.parameters = parameters
        self.ccr = ConsolidatedChecksReport(
            title=type(self).__name__,
            doc=str(self.__doc__).replace("\n", ""),
            filenames={
                "Model Results": str(self.db.model_results.path),
                "Partner Data": str(self.db.partner_data.path) if self.db.partner_data is not None else "None",
                "PF Input Data": str(self.db.pf_input_data.path) if self.db.pf_input_data is not None else "None",
            },
        )

    def _run_check(self, the_func: Callable) -> CheckReport:
        """Run a particular check and return a Check Result.
        * A `CheckResult` instance should be returned by each check, giving the result of the check and a message
        * If nothing is returned, the check is assumed to have passed.
        * If an AssertionError occurs in the test, then it is assumed the check has failed and the message in the error
          is used as the message in the check.
        """
        # Capture the static information about the check.
        header = dict(
            name=the_func.__name__,
            description=the_func.__doc__,
            is_critical=is_critical(the_func),
        )

        try:
            ch_res: CheckResult = the_func(self.db)
            if ch_res is None:
                return CheckReport(**header, passes=True, message="")
            elif isinstance(ch_res, CheckResult):
                return CheckReport(
                    **header, passes=ch_res.passes, message=ch_res.message
                )
            else:
                raise ValueError(f"Check {the_func} returned unexpected item")

        except AssertionError as assertion_error:
            message_in_assertion_error = assertion_error.args[0].split("\n")[0]
            return CheckReport(
                **header, passes=False, message=message_in_assertion_error
            )

    def run(
        self,
        suppress_error: Optional[bool] = False,
        verbose: bool = False,
        filename: Optional[Path] = None,
    ) -> bool:
        """Run all the checks that are defined in this class and returns True if all checks pass.
        A summary of the checks is printed to console. By default, any failed checks lead to an Error, but this can be
         stopped with `suppress_error`. Optionally, the results of the checks can be saved to a logfile.
        """

        # Run all the checks
        wipe()
        print(f"✨ Initiating checks {self.__class__} ✨")
        check_names = self._get_check_names()
        for ch_name in check_names:
            ch_func: Callable = self.__getattribute__(ch_name)
            print(f"Running: {ch_name} .....", end="")
            self.ccr.add_check_report(self._run_check(ch_func))
            print("Done!")

        # Report (print to console and, optionally, create pdf)
        self.ccr.report(filename=filename, verbose=verbose)

        # Determine if error should be thrown
        if self.ccr.any_fails and (not suppress_error):
            raise DataCheckError("Some checks have failed.")

        # Determine the outcome bool (True if there have been no fails)
        return not self.ccr.any_fails

    def _get_check_names(self) -> list:
        """Return the names of the checks found in the class.
        (A check is any function defined in the class where the name is not `run` or begins with `_`.)
        """
        return sorted(
            set(
                [
                    name
                    for name in dir(self)
                    if (
                        not name.startswith("_")
                        and (not name.startswith("XX"))
                        and (not name.startswith("run_"))
                        and callable(self.__getattribute__(name))
                    )
                ]
            )
            - {"run"}
        )

run(suppress_error=False, verbose=False, filename=None)

Run all the checks that are defined in this class and returns True if all checks pass. A summary of the checks is printed to console. By default, any failed checks lead to an Error, but this can be stopped with suppress_error. Optionally, the results of the checks can be saved to a logfile.

Source code in src/tgftools/checks.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def run(
    self,
    suppress_error: Optional[bool] = False,
    verbose: bool = False,
    filename: Optional[Path] = None,
) -> bool:
    """Run all the checks that are defined in this class and returns True if all checks pass.
    A summary of the checks is printed to console. By default, any failed checks lead to an Error, but this can be
     stopped with `suppress_error`. Optionally, the results of the checks can be saved to a logfile.
    """

    # Run all the checks
    wipe()
    print(f"✨ Initiating checks {self.__class__} ✨")
    check_names = self._get_check_names()
    for ch_name in check_names:
        ch_func: Callable = self.__getattribute__(ch_name)
        print(f"Running: {ch_name} .....", end="")
        self.ccr.add_check_report(self._run_check(ch_func))
        print("Done!")

    # Report (print to console and, optionally, create pdf)
    self.ccr.report(filename=filename, verbose=verbose)

    # Determine if error should be thrown
    if self.ccr.any_fails and (not suppress_error):
        raise DataCheckError("Some checks have failed.")

    # Determine the outcome bool (True if there have been no fails)
    return not self.ccr.any_fails

critical(func)

Decorator used to signify that a particular check is a 'critical'.

Source code in src/tgftools/checks.py
62
63
64
65
66
67
68
69
70
def critical(func):
    """Decorator used to signify that a particular check is a 'critical'."""

    @wraps(func)
    def wrapped(*args, **kwargs):
        return func(*args, **kwargs)

    wrapped.critical = True
    return wrapped

is_critical(func)

Returns True if the function has been decorated as @critical. From: https://stackoverflow.com/a/68583930

Source code in src/tgftools/checks.py
73
74
75
76
def is_critical(func):
    """Returns True if the function has been decorated as `@critical`.
    From: https://stackoverflow.com/a/68583930"""
    return getattr(func, "critical", False)

Analysis

This is the Analysis class. It holds a Database object and requires an argument for the scenario_descriptor. It can then output ensemble results (or country-level results) that reflect decisions for the use of the funding - in particular, when the TGF funding is non-fungible (Approach A) and when it is fungible and its allocation between countries can be optimised (Approach B). :param years_for_funding: Defines the calendar years (integers) for which the budgets correspond, (i.e, the years to which the replenishment funding scenarios correspond). :param handle_out_of_bounds_costs: Determines whether an error is thrown when a result for country is needed for a cost that is not in the range of the model_results, or whether results are used for highest/lowest cost model_results instead. This is passed through to the Emulator class.

Source code in src/tgftools/analysis.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
class Analysis:
    """This is the Analysis class. It holds a Database object and requires an argument for the scenario_descriptor.
    It can then output ensemble results (or country-level results) that reflect decisions for the use of the funding -
    in particular, when the TGF funding is non-fungible (Approach A) and when it is fungible and its allocation
    between countries can be optimised (Approach B).
    :param years_for_funding: Defines the calendar years (integers) for which the budgets correspond, (i.e, the years
     to which the replenishment funding scenarios correspond).
    :param handle_out_of_bounds_costs: Determines whether an error is thrown when a result for country is needed
     for a cost that is not in the range of the model_results, or whether results are used for highest/lowest cost
     model_results instead. This is passed through to the `Emulator` class.
    """

    def __init__(
        self,
        database: Database,
        tgf_funding: TgfFunding,
        non_tgf_funding: NonTgfFunding,
        parameters: Parameters,
    ):
        # Save arguments (nb, funding data are updated again later in __init__)
        self.database = database
        self.parameters = parameters
        self.tgf_funding = tgf_funding
        self.non_tgf_funding = non_tgf_funding

        # Save short-cuts to elements of the database.
        self.gp: Gp = database.gp
        self.countries = database.model_results.countries

        # Store some parameters for easy access
        self.disease_name = self.database.disease_name
        self.indicators = self.parameters.get_indicators_for(self.disease_name)
        self.scenario_descriptor = parameters.get('SCENARIO_DESCRIPTOR_FOR_IC')
        self.handle_out_of_bounds_costs = parameters.get('HANDLE_OUT_OF_BOUNDS_COSTS')
        self.innovation_on = parameters.get('INNOVATION_ON')
        self.years_for_funding = self.parameters.get('YEARS_FOR_FUNDING')
        self.indicators_for_adj_for_innovations = self.parameters.get(self.disease_name).get(
            'INDICATORS_FOR_ADJ_FOR_INNOVATIONS')
        self.EXPECTED_GP_SCENARIO = self.parameters.get_gpscenario().index.to_list()

        # Filter funding assumptions for countries that are not modelled
        self.tgf_funding = self.filter_funding_data_for_non_modelled_countries(self.tgf_funding)
        self.non_tgf_funding = self.filter_funding_data_for_non_modelled_countries(self.non_tgf_funding)

        # If we should remove the dominated points, edit the model results accordingly:
        if self.parameters.get('REMOVE_DOMINATED_POINTS'):
            self.database.model_results = filter_for_frontier(
                model_results=self.database.model_results,
                scenario_descriptor=self.scenario_descriptor,
                years_for_obj_func=parameters.get("YEARS_FOR_OBJ_FUNC"),
                years_for_funding=parameters.get("YEARS_FOR_FUNDING"),
            )

        # Create emulators for each country so that results can be created for any cost (within the range of actual
        # results).
        self.emulators: dict = {
            c: Emulator(
                database=self.database,
                scenario_descriptor=self.scenario_descriptor,
                country=c,
                years_for_funding=self.years_for_funding,
                handle_out_of_bounds_costs=self.handle_out_of_bounds_costs,
            )
            for c in self.countries
        }

    def filter_funding_data_for_non_modelled_countries(
            self, funding_data_object: TgfFunding | NonTgfFunding
    ) -> TgfFunding | NonTgfFunding:
        """Returns a funding data object that has been filtered for countries that are not declared as the modelled
        countries for that disease."""
        list_of_modelled_countries = self.parameters.get_modelled_countries_for(self.disease_name)
        funding_data_object = copy(funding_data_object)
        funding_data_object.df = funding_data_object.df[funding_data_object.df.index.isin(list_of_modelled_countries)]
        return funding_data_object

    def portfolio_projection_approach_a(self) -> PortfolioProjection:
        """Returns the PortfolioProjection For Approach A: i.e., the projection for each country, given the funding
        to each country when the TGF funding allocated to a country CANNOT be changed.
        """

        country_results = self._get_country_projections_given_funding_dollar_amounts(
            total_funding_by_country=(
                self.tgf_funding.df["value"] + self.non_tgf_funding.df["value"]
            ).to_dict()
        )
        return PortfolioProjection(
            tgf_funding_by_country=self.tgf_funding.df["value"].to_dict(),
            non_tgf_funding_by_country=self.non_tgf_funding.df["value"].to_dict(),
            country_results=country_results,
            portfolio_results=self._make_portfolio_results(
                country_results=country_results,
                adjust_for_unmodelled_innovation=self.innovation_on,
                name='none',
            ),
        )

    def portfolio_projection_approach_b(
        self,
    ) -> PortfolioProjection:
        """Returns the PortfolioProjection For Approach B: i.e., the projection for each country, given the funding
        to each country when the TGF funding allocated to a country _CAN_ be changed. Multiple methods for optimisation
        may be tried, but only a single result is provided (that of the best solution found.)
        :param methods: List of methods to use in approach_b (For method see `do_approach_b`)
        :param optimisation_params: Dict of parameters specifying how to construct the optimisation.
        See `_get_data_frames_for_approach_b`
        """
        # Use the `ApproachB` class to get the TGF funding allocations from the optimisation, getting only the best
        # result.

        methods = self.parameters.get('APPROACH_B_METHODS')

        results_from_approach_b = self._approach_b().do_approach_b(
            methods=methods, provide_best_only=True
        )
        tgf_funding_under_approach_b = results_from_approach_b.tgf_budget_by_country

        country_results = self._get_country_projections_given_funding_dollar_amounts(
            (
                pd.Series(tgf_funding_under_approach_b)
                + self.non_tgf_funding.df["value"]
            ).to_dict()
        )
        return PortfolioProjection(
            tgf_funding_by_country=tgf_funding_under_approach_b,
            non_tgf_funding_by_country=self.non_tgf_funding.df["value"].to_dict(),
            country_results=country_results,
            portfolio_results=self._make_portfolio_results(
                country_results=country_results,
                adjust_for_unmodelled_innovation=self.innovation_on,
                name='none',
            )
        )


    def portfolio_projection_approach_c(self, funding_fraction: float) -> PortfolioProjection:
        """Returns the PortfolioProjection For Approach C: i.e., the funding fraction is the same in all countries
        """
        country_results = self._get_country_projection_given_funding_fraction(funding_fraction=funding_fraction)
        return PortfolioProjection(
            tgf_funding_by_country=None,  # In this scenario, we do not know the split between TGF and non-TGF sources
            non_tgf_funding_by_country=None,
            country_results=country_results,
            portfolio_results=self._make_portfolio_results(
                country_results=country_results,
                adjust_for_unmodelled_innovation=self.innovation_on,
                name='none',
            ),
        )


    def portfolio_projection_counterfactual(
            self,
            name: str,
    ) -> PortfolioProjection:
        """Returns a PortfolioProjection for a chosen counterfactual scenario."""

        assert name in self.database.model_results.df.index.get_level_values('scenario_descriptor'),\
            f"Counterfactual {name} not found in model results."

        # Create dict of country_results corresponding to the counterfactual scenario
        country_results = dict()
        for country in self.countries:

            model_projection = {
                indicator:
                    self.database.model_results.df.loc[(name, slice(None), country, slice(None), indicator)]
                    .droplevel(axis=0, level='funding_fraction')
                    .rename(columns={'central': 'model_central', 'low': 'model_low', 'high': 'model_high'})
                for indicator in self.database.model_results.indicators
            }

            country_results[country] = CountryProjection(
                model_projection=model_projection,
                funding=float('nan'),
            )

        return PortfolioProjection(
            tgf_funding_by_country={k: float('nan') for k in self.countries},
            non_tgf_funding_by_country={k: float('nan') for k in self.countries},
            country_results=country_results,
            portfolio_results=self._make_portfolio_results(country_results, adjust_for_unmodelled_innovation=False, name=name),
        )

    def dump_everything_to_xlsx(
            self,
            filename: Path,
    ) -> None:
        """Dump everything into an Excel file."""
        DumpAnalysisToExcel(self, filename)

    def _approach_b(self) -> ApproachB:
        """Returns the object `ApproachB` so that other features of it can be accessed conveniently."""
        return ApproachB(**self.get_data_frames_for_approach_b())

    def get_data_frames_for_approach_b(
        self,
    ) -> Dict[str, pd.DataFrame]:
        """Returns dict of dataframes needed for using the `ApproachB` class. This is where the quantities are
        computed that summarises the performance of each country under each funding_fraction and the GP, which forms
        the basis of the optimisation."""

        # ---------------
        # Get parameters:
        force_monotonic_decreasing = self.parameters.get("FORCE_MONOTONIC_DECREASING")
        years_for_obj_func = self.parameters.get("YEARS_FOR_OBJ_FUNC")
        # ---------------

        # get budgets as data-frames
        tgf_budgets = self.tgf_funding.df["value"].reset_index()
        non_tgf_budgets = self.non_tgf_funding.df["value"].reset_index()

        # Create Model Results df: country|cases|deaths|cost (multiple row per country, one for each costing value),
        # with...
        # * cost being the sums of cost within the years specified by `years_for_funding` (i.e. the years of the
        #   replenishment).
        # * cases and death being sums within the years specified by `years_for_obj_func` (i.e. the period over which
        #   we wish to "compete" the different funding allocations).

        # Summarise cases/death for each funding_fraction: sums within  `years_for_obj_func`
        cases_and_deaths = (
            self.database.model_results.df.loc[
                (
                    self.scenario_descriptor,
                    slice(None),
                    slice(None),
                    years_for_obj_func,
                    ["cases", "deaths"],
                )
            ]["central"]
            .groupby(axis=0, level=["funding_fraction", "country", "indicator"])
            .sum()
            .unstack("indicator")
        )

        # Summarise cost for each funding_fraction: sums within `self.years_for_funding`
        costs = (
            self.database.model_results.df.loc[
                (
                    self.scenario_descriptor,
                    slice(None),
                    slice(None),
                    self.years_for_funding,
                    ["cost"],
                )
            ]["central"]
            .groupby(axis=0, level=["funding_fraction", "country", "indicator"])
            .sum()
            .unstack("indicator")
        )

        # join these two dataframes:
        model_results = cases_and_deaths.join(costs).reset_index().sort_values(["country", "cost"]).reset_index(drop=True)

        # Handle_out_of_bounds_costs`: Insert a set of records for zero-funding with same results as for 10% funding
        # and a set of records with float('inf') costs for with the same results as the highest funding level
        if self.handle_out_of_bounds_costs:
            zero_funding_records = model_results.loc[model_results['funding_fraction'] == 0.1].copy()
            zero_funding_records['funding_fraction'] = 0.0
            zero_funding_records['cost'] = 0.0

            inf_funding_records = model_results.loc[model_results['funding_fraction'] == 1.0].copy()
            inf_funding_records['funding_fraction'] = float('inf')
            inf_funding_records['cost'] = float('inf')

            model_results = pd.concat([model_results, zero_funding_records, inf_funding_records], axis=0).sort_values(["country", "cost"]).reset_index(drop=True)

        # Force_monotonic_decreasing`: Within the results for each country, force that cases and deaths are
        #  monotonically decreasing with costs.
        if force_monotonic_decreasing:
            for country in model_results.country.unique():
                raw_sorted_on_cost = model_results.loc[model_results['country'] == country, ['cost', 'deaths', 'cases']].set_index('cost').sort_index(ascending=True)
                model_results.loc[model_results['country'] == country, 'cost'] = raw_sorted_on_cost.index.values
                model_results.loc[model_results['country'] == country, 'cases'] = raw_sorted_on_cost['cases'].cummin().values
                model_results.loc[model_results['country'] == country, 'deaths'] = raw_sorted_on_cost['deaths'].cummin().values

        # Tidy-up (sort and drop any duplicates)
        model_results = model_results.reset_index() \
                                     .drop(columns=["funding_fraction"]) \
                                     .drop_duplicates(subset=['country', 'cost']) \
                                     .sort_values(["country", "cost"])[["country", "cost", "cases", "deaths"]]

        return {
            "tgf_budgets": tgf_budgets,
            "non_tgf_budgets": non_tgf_budgets,
            "model_results": model_results,
        }

    def _get_country_projections_given_funding_dollar_amounts(
        self, total_funding_by_country: Dict[str, float]
    ) -> Dict[str, CountryProjection]:
        """Returns a dict of CountryProjections given specified total funding dollar amounts to each country."""

        # Collect results for each country
        country_results = dict()
        for country, total_dollar_funding in total_funding_by_country.items():

            if country not in self.countries:
                # Skip a country that is included in the funding data but not included in the model results
                continue

            model_projection = self.emulators[country].get(
                dollars=total_dollar_funding,
            )

            country_projection = CountryProjection(
                model_projection=model_projection,
                funding=total_dollar_funding,
            )
            country_results[country] = country_projection

        return country_results

    def _get_country_projection_given_funding_fraction(self, funding_fraction: float) -> Dict[str, CountryProjection]:
        """Returns a dict of CountryProjections given a specified funding_fraction, which is the same in all countries"""
        country_results = dict()
        for country in self.countries:
            model_projection = self.emulators[country].get(
                funding_fraction=funding_fraction,
            )
            country_projection = CountryProjection(
                model_projection=model_projection,
                funding=None,  # could find this from self.emulators[country]._lookup_dollars_to_funding_fraction[1.0]
            )
            country_results[country] = country_projection
        return country_results

    def _make_portfolio_results(
            self,
            country_results: Dict[str, CountryProjection],
            adjust_for_unmodelled_innovation: bool,
            name: str,
    ) -> Dict[str, pd.DataFrame]:
        """ This function generates portfolio level results. This included summing up variables across countries,
        scaling up for non-modelled countries, and doing the adjustment for GP-related innovation. """

        actual_without_innovation = (
            self._scale_up_for_non_modelled_countries(
                self._summing_up_countries(country_results, name), name
            )
        )

        if not adjust_for_unmodelled_innovation:
            return actual_without_innovation

        else:
            # Get the fully funded version of the model output
            scenario_that_represents_full_impact_including_innovation = self.parameters.get('SCENARIO_THAT_REPRESENTS_FULL_IMPACT_INCLUDING_INNOVATION')
            full_funding_without_innovation = self.portfolio_projection_counterfactual(scenario_that_represents_full_impact_including_innovation)

            return (
                self._adj_for_innovations(
                    actual_without_innovation=actual_without_innovation,
                    full_funding_without_innovation=full_funding_without_innovation,
                    gp=self.gp,
                )
            )

    def _scale_up_for_non_modelled_countries(self, country_results: Dict[str, CountryProjection], name: str) -> Dict[str, pd.DataFrame]:
        """ This scales the modelled results to non-modelled countries for the epi indicators. """

        # Get the first year of the model and list of epi indicators
        first_year = self.parameters.get("START_YEAR")
        if name == ('GP'):
            first_year = self.parameters.get(self.disease_name).get("GP_START_YEAR")

        # Get the indicators that should be scaled
        indicator_list = self.parameters.get_indicators_for(self.disease_name).use_scaling
        indicator_list = pd.DataFrame(indicator_list).reset_index()
        indicator_list = indicator_list.loc[indicator_list['use_scaling'] == True]
        indicator_list = indicator_list['name'].tolist()

        # Filter partner data to the corresponding year and epi indicators, summed across countries and turned
        # into a dictionary
        df_partner = self.database.partner_data.df.loc[(self.scenario_descriptor, slice(None), first_year, indicator_list)].groupby(axis=0, level='indicator').sum()['central'].to_dict()

        # This loop scaled all epi indicators to non-modelled countries
        adj_results_portfolio = dict()
        for indicator, df in country_results.items():
            if indicator not in indicator_list:
                adj_results_portfolio[indicator] = df
            else:
                # do scaling:
                adj_results_portfolio[indicator] = df * (df_partner[indicator] / df.at[first_year, 'model_central'])

        return adj_results_portfolio

    def _adj_for_innovations(
            self,
            actual_without_innovation: Dict[str, CountryProjection],
            full_funding_without_innovation: Dict[str, CountryProjection],
            gp: Gp,
    ) -> Dict[str, pd.DataFrame]:
        """ This will make the necessary adjustments for innovations assumed to come in within the partner GP. """


        sigmoid_scaling = pd.Series(
            dict(zip(
                range(self.parameters.get('START_YEAR'), self.parameters.get('END_YEAR') + 1),
                self.parameters.get(self.disease_name).get("NEW_INNOVATIONS_SCALING_FACTORS")
            ))
        )

        INDICATORS_FOR_ADJ_FOR_INNOVATIONS = self.indicators_for_adj_for_innovations

        adj_country_results = dict()

        for indicator, df in actual_without_innovation.items():

            if indicator not in INDICATORS_FOR_ADJ_FOR_INNOVATIONS:
                # Do not do any adjustment
                adj_country_results[indicator] = df.copy()

            else:
                # Do the adjustment for new innovations

                # Set first year of model output
                expected_first_year = self.parameters.get("START_YEAR")

                # Work out correction needed for non-modelled innovations:
                full_funding = full_funding_without_innovation.portfolio_results[indicator]
                _gp_df = gp.df.reset_index()
                _gp = _gp_df.loc[(_gp_df.indicator == indicator), ['year', 'central']].set_index('year')['central']
                _gp = _gp[_gp.index >= expected_first_year]  # Ensure all dfs have same length
                step_one = (df / full_funding).mul(_gp, axis=0)
                step_two = df - (df - step_one).mul(sigmoid_scaling, axis=0)

                # Over-write the lower and upper bounds so they are the same distance as the modelled distance before applying the sigmoidal adjustment
                # If not, the lower bounds and upper bounds can behave strangely

                # First capture the distance from central to LB and Ub from unadjusted
                distance_low  = df['model_central'] - df['model_low']
                distance_upper = df['model_high'] - df['model_central']

                step_two['model_low'] = step_two['model_central'] - distance_low
                step_two['model_high'] = step_two['model_central'] + distance_upper

                # Add adjusted time series to the dataframe
                adj_country_results[indicator] = step_two

        return adj_country_results

    def _summing_up_countries(self, country_results: Dict[str, CountryProjection], name: str) -> Dict[str, pd.DataFrame]:
        """ This will sum up all the country results to get the portolfio-level results. This will use the adjusted
        country results and be used to generate uncertainty. """

        def _compute_mean_and_ci(_df_for_year: pd.DataFrame):
            """This helper function accepts a dataframe for one year of model results for each country, and returns a
            dict summarising the statistic across the countries, as a mean low/high range.
            """
            model_central = _df_for_year["model_central"].sum()

            # Then we do the SDs. CAUTION for the first SD it has to be 1.96 as the assumptions that the model LB and UB
            # correspond to 95% confidence intervals
            _sds = ((_df_for_year.model_high - _df_for_year.model_low) / (2 * 1.96)).values
            sd_for_year = (
                                  matmul(_sds).sum() * rho_btw_countries
                                  + (_sds ** 2).sum() * (1 - rho_btw_countries)
                          ) ** 0.5
            model_low = max(0, (model_central - z_value * sd_for_year))
            model_high = model_central + z_value * sd_for_year
            return {
                'model_central': model_central,
                'model_low': model_low,
                'model_high': model_high
            }

        # Define years and parameters we need
        p = self.parameters
        first_year = p.get("START_YEAR")
        if name == ('GP'):            first_year = self.parameters.get(self.disease_name).get("GP_START_YEAR")
        last_year = p.get("END_YEAR")
        z_value = p.get("Z_VALUE")
        rho_btw_countries = p.get("RHO_BETWEEN_COUNTRIES_WITHIN_DISEASE")

        portfolio_results = dict()

        # Defining the list of indicators and countries for the loop
        indicators = country_results[list(country_results.keys())[0]].model_projection.keys()
        types_lookup = self.indicators['type'].to_dict()

        countries = country_results.keys()
        # Extracting all values for each indicator across all countries, if we should do an aggregation
        for indicator in indicators:
            type_of_indicator_is_count = types_lookup[indicator] == 'count'

            if not type_of_indicator_is_count:
                # Do nothing if the indicator is not aggregating arithmetically (i.e., is a count).
                continue

            dfs = list()
            for country in countries:
                dfs.append(
                    country_results[country].model_projection[indicator].loc[
                        slice(first_year, last_year),
                        ['model_central', 'model_high', 'model_low']
                    ]
                )

            # Put all the values for a given indicator together into one df
            all_dfs = pd.concat(dfs)

            # Aggregate by year:
            _res = dict()
            for year in range(first_year, last_year + 1):
                _res[year] = all_dfs.loc[year].pipe(_compute_mean_and_ci)

            portfolio_results[indicator] = pd.DataFrame(_res).T

        return portfolio_results

    def get_partner(self) -> pd.DataFrame:
        """Returns data-frame of the partner data that are needed for reporting."""

        if self.disease_name == 'HIV':
            indicator_partner = ['cases', 'deaths', 'hivneg', 'population']
        if self.disease_name == 'TB':
            indicator_partner = ['cases', 'deaths', 'deathshivneg', 'population']
        if self.disease_name == 'MALARIA':
            indicator_partner = ['cases', 'deaths', 'par']

        expected_first_year = self.parameters.get("GRAPH_FIRST_YEAR") - 5
        expected_last_year = self.parameters.get("START_YEAR") + 1

        partner_data = self.database.partner_data.df.loc[
            (self.scenario_descriptor, slice(None), range(expected_first_year, expected_last_year), indicator_partner)].groupby(axis=0, level=['year', 'indicator'])['central'].sum().unstack()

        return partner_data

    def get_gp(self) -> pd.DataFrame:
        """Returns data-frame of the GP elements that are needed for reporting."""

        if self.disease_name != 'HIV':
            gp_data = self.database.gp.df['central'].unstack()
        else:
            # Get GP for HIV
            gp_data = self.portfolio_projection_counterfactual(self.EXPECTED_GP_SCENARIO[0])

            # Convert to the same format as other diseases
            gp_data = gp_data.portfolio_results
            gp_data = pd.concat(gp_data, axis=0).reset_index(level=0).rename({'level_0': 'key'}, axis=1)
            gp_data = gp_data.drop(['model_low', 'model_high'], axis=1)
            gp_data = gp_data.pivot(columns='key', values='model_central')

        return gp_data

    def get_counterfactual_lives_saved_malaria(self) -> pd.DataFrame:
        """ Return the CF time series to compute lives saved for malaria"""

        if self.disease_name != "MALARIA":
            return pd.DataFrame()

        # Get partner mortality data
        mortality_partner_data = self.database.partner_data.df.loc[
            (self.scenario_descriptor, slice(None), 2000, "mortality"), "central"
        ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])
        # 2000 is hard-coded as the year as that is intrinsic to the analysis.

        # TODO: make mean of funding fractions?
        # Set years of model output
        expected_first_year = self.parameters.get("START_YEAR")
        expected_last_year = self.parameters.get("END_YEAR")

        # First adjust model data to baseline partner data

        # Get the model estimates for par for IC scenario and generate mean across funding fractions
        par_model_data = self.database.model_results.df.loc[
            (self.scenario_descriptor, slice(None), slice(None), range(expected_first_year, expected_last_year), "par"), "central"
        ].groupby(axis=0, level=['country', 'year']).mean().unstack()

        # Then get the estimates from baseline from model and partner data to compute adjustment ratio
        par_firstyear_partner_data = self.database.partner_data.df.loc[
            (self.scenario_descriptor, slice(None), expected_first_year, "par"), "central"
        ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])

        par_firstyear_model_data = self.database.model_results.df.loc[
            (self.scenario_descriptor, 1, slice(None), expected_first_year, "par"), "central"
        ].droplevel(axis=0, level=["scenario_descriptor", "funding_fraction", "year", "indicator"])

        ratio = par_firstyear_partner_data / par_firstyear_model_data

        # Compute modelled par that have been adjusted to baseline partner data
        adj_par_data_model = par_model_data.mul(ratio, axis=0)

        # Now compute deaths from the above as a CF time series for lives saved
        adjusted_mortality = adj_par_data_model.mul(mortality_partner_data, axis=0)
        adjusted_mortality_total = adjusted_mortality.sum(axis=0)
        adjusted_mortality_total.index = adjusted_mortality_total.index.astype(int)

        return adjusted_mortality_total

    def get_counterfactual_infections_averted_malaria(self) -> pd.DataFrame:
        """ Return the CF time series to compute infections averted for malaria"""

        if self.disease_name != "MALARIA":

            return pd.DataFrame()

        # Get partner mortality data
        # Set first year of model output
        expected_first_year = self.parameters.get("START_YEAR")
        expected_last_year = self.parameters.get("END_YEAR")

        incidence_partner_data = self.database.partner_data.df.loc[
            (self.scenario_descriptor, slice(None), expected_first_year, "incidence"), "central"
        ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])

        # TODO: make mean of funding fractions?
        # First adjust model data to baseline partner data
        # Get the model estimates for par for IC scenario and generate mean across funding fractions
        par_model_data = self.database.model_results.df.loc[
            (self.scenario_descriptor, slice(None), slice(None), range(expected_first_year, expected_last_year), "par"), 'central'
        ].groupby(axis=0, level=['country', 'year']).mean().unstack()

        # Then get the estimates from baseline from model and partner data to compute adjustment ratio
        par_firstyear_partner_data = self.database.partner_data.df.loc[
            (self.scenario_descriptor, slice(None), expected_first_year, "par"), "central"
        ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])

        par_firstyear_model_data = self.database.model_results.df.loc[
            (self.scenario_descriptor, 1, slice(None), expected_first_year, "par"), "central"
        ].droplevel(axis=0, level=["scenario_descriptor", "funding_fraction", "year", "indicator"])

        ratio = par_firstyear_partner_data / par_firstyear_model_data

        # Compute modelled par that have been adjusted to baseline partner data
        adj_par_data_model = par_model_data.mul(ratio, axis=0)

        # Now compute deaths from the above as a CF time series for lives saved
        adjusted_incidence = adj_par_data_model.mul(incidence_partner_data, axis=0)
        adjusted_incidence_total = adjusted_incidence.sum(axis=0)
        adjusted_incidence_total.index = adjusted_incidence_total.index.astype(int)

        return adjusted_incidence_total

    def make_diagnostic_report(
            self,
            plt_show: Optional[bool] = False,
            filename: Optional[Path] = None,
    ):
        """
        Create a report that compares the results from Approach A and B (and alternative optimisation methods for
        Approach B if these are specified).
        :param plt_show: determines whether to show the plot
        :param filename: filename to save the report to
        """
        # Create the approach_b object
        approach_b_object = self._approach_b()

        # Run the report, specifying whether to plot graphs, the filename, and passing through any other kwargs
        # Suppress the returned results as the purpose of this function is generating the report.
        _ = approach_b_object.run(
            plt_show=plt_show,
            filename=filename,
            methods=self.parameters.get('APPROACH_B_METHODS'),
            provide_best_only=False,
        )

dump_everything_to_xlsx(filename)

Dump everything into an Excel file.

Source code in src/tgftools/analysis.py
296
297
298
299
300
301
def dump_everything_to_xlsx(
        self,
        filename: Path,
) -> None:
    """Dump everything into an Excel file."""
    DumpAnalysisToExcel(self, filename)

filter_funding_data_for_non_modelled_countries(funding_data_object)

Returns a funding data object that has been filtered for countries that are not declared as the modelled countries for that disease.

Source code in src/tgftools/analysis.py
178
179
180
181
182
183
184
185
186
def filter_funding_data_for_non_modelled_countries(
        self, funding_data_object: TgfFunding | NonTgfFunding
) -> TgfFunding | NonTgfFunding:
    """Returns a funding data object that has been filtered for countries that are not declared as the modelled
    countries for that disease."""
    list_of_modelled_countries = self.parameters.get_modelled_countries_for(self.disease_name)
    funding_data_object = copy(funding_data_object)
    funding_data_object.df = funding_data_object.df[funding_data_object.df.index.isin(list_of_modelled_countries)]
    return funding_data_object

get_counterfactual_infections_averted_malaria()

Return the CF time series to compute infections averted for malaria

Source code in src/tgftools/analysis.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
def get_counterfactual_infections_averted_malaria(self) -> pd.DataFrame:
    """ Return the CF time series to compute infections averted for malaria"""

    if self.disease_name != "MALARIA":

        return pd.DataFrame()

    # Get partner mortality data
    # Set first year of model output
    expected_first_year = self.parameters.get("START_YEAR")
    expected_last_year = self.parameters.get("END_YEAR")

    incidence_partner_data = self.database.partner_data.df.loc[
        (self.scenario_descriptor, slice(None), expected_first_year, "incidence"), "central"
    ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])

    # TODO: make mean of funding fractions?
    # First adjust model data to baseline partner data
    # Get the model estimates for par for IC scenario and generate mean across funding fractions
    par_model_data = self.database.model_results.df.loc[
        (self.scenario_descriptor, slice(None), slice(None), range(expected_first_year, expected_last_year), "par"), 'central'
    ].groupby(axis=0, level=['country', 'year']).mean().unstack()

    # Then get the estimates from baseline from model and partner data to compute adjustment ratio
    par_firstyear_partner_data = self.database.partner_data.df.loc[
        (self.scenario_descriptor, slice(None), expected_first_year, "par"), "central"
    ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])

    par_firstyear_model_data = self.database.model_results.df.loc[
        (self.scenario_descriptor, 1, slice(None), expected_first_year, "par"), "central"
    ].droplevel(axis=0, level=["scenario_descriptor", "funding_fraction", "year", "indicator"])

    ratio = par_firstyear_partner_data / par_firstyear_model_data

    # Compute modelled par that have been adjusted to baseline partner data
    adj_par_data_model = par_model_data.mul(ratio, axis=0)

    # Now compute deaths from the above as a CF time series for lives saved
    adjusted_incidence = adj_par_data_model.mul(incidence_partner_data, axis=0)
    adjusted_incidence_total = adjusted_incidence.sum(axis=0)
    adjusted_incidence_total.index = adjusted_incidence_total.index.astype(int)

    return adjusted_incidence_total

get_counterfactual_lives_saved_malaria()

Return the CF time series to compute lives saved for malaria

Source code in src/tgftools/analysis.py
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def get_counterfactual_lives_saved_malaria(self) -> pd.DataFrame:
    """ Return the CF time series to compute lives saved for malaria"""

    if self.disease_name != "MALARIA":
        return pd.DataFrame()

    # Get partner mortality data
    mortality_partner_data = self.database.partner_data.df.loc[
        (self.scenario_descriptor, slice(None), 2000, "mortality"), "central"
    ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])
    # 2000 is hard-coded as the year as that is intrinsic to the analysis.

    # TODO: make mean of funding fractions?
    # Set years of model output
    expected_first_year = self.parameters.get("START_YEAR")
    expected_last_year = self.parameters.get("END_YEAR")

    # First adjust model data to baseline partner data

    # Get the model estimates for par for IC scenario and generate mean across funding fractions
    par_model_data = self.database.model_results.df.loc[
        (self.scenario_descriptor, slice(None), slice(None), range(expected_first_year, expected_last_year), "par"), "central"
    ].groupby(axis=0, level=['country', 'year']).mean().unstack()

    # Then get the estimates from baseline from model and partner data to compute adjustment ratio
    par_firstyear_partner_data = self.database.partner_data.df.loc[
        (self.scenario_descriptor, slice(None), expected_first_year, "par"), "central"
    ].droplevel(axis=0, level=["scenario_descriptor", "year", "indicator"])

    par_firstyear_model_data = self.database.model_results.df.loc[
        (self.scenario_descriptor, 1, slice(None), expected_first_year, "par"), "central"
    ].droplevel(axis=0, level=["scenario_descriptor", "funding_fraction", "year", "indicator"])

    ratio = par_firstyear_partner_data / par_firstyear_model_data

    # Compute modelled par that have been adjusted to baseline partner data
    adj_par_data_model = par_model_data.mul(ratio, axis=0)

    # Now compute deaths from the above as a CF time series for lives saved
    adjusted_mortality = adj_par_data_model.mul(mortality_partner_data, axis=0)
    adjusted_mortality_total = adjusted_mortality.sum(axis=0)
    adjusted_mortality_total.index = adjusted_mortality_total.index.astype(int)

    return adjusted_mortality_total

get_data_frames_for_approach_b()

Returns dict of dataframes needed for using the ApproachB class. This is where the quantities are computed that summarises the performance of each country under each funding_fraction and the GP, which forms the basis of the optimisation.

Source code in src/tgftools/analysis.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def get_data_frames_for_approach_b(
    self,
) -> Dict[str, pd.DataFrame]:
    """Returns dict of dataframes needed for using the `ApproachB` class. This is where the quantities are
    computed that summarises the performance of each country under each funding_fraction and the GP, which forms
    the basis of the optimisation."""

    # ---------------
    # Get parameters:
    force_monotonic_decreasing = self.parameters.get("FORCE_MONOTONIC_DECREASING")
    years_for_obj_func = self.parameters.get("YEARS_FOR_OBJ_FUNC")
    # ---------------

    # get budgets as data-frames
    tgf_budgets = self.tgf_funding.df["value"].reset_index()
    non_tgf_budgets = self.non_tgf_funding.df["value"].reset_index()

    # Create Model Results df: country|cases|deaths|cost (multiple row per country, one for each costing value),
    # with...
    # * cost being the sums of cost within the years specified by `years_for_funding` (i.e. the years of the
    #   replenishment).
    # * cases and death being sums within the years specified by `years_for_obj_func` (i.e. the period over which
    #   we wish to "compete" the different funding allocations).

    # Summarise cases/death for each funding_fraction: sums within  `years_for_obj_func`
    cases_and_deaths = (
        self.database.model_results.df.loc[
            (
                self.scenario_descriptor,
                slice(None),
                slice(None),
                years_for_obj_func,
                ["cases", "deaths"],
            )
        ]["central"]
        .groupby(axis=0, level=["funding_fraction", "country", "indicator"])
        .sum()
        .unstack("indicator")
    )

    # Summarise cost for each funding_fraction: sums within `self.years_for_funding`
    costs = (
        self.database.model_results.df.loc[
            (
                self.scenario_descriptor,
                slice(None),
                slice(None),
                self.years_for_funding,
                ["cost"],
            )
        ]["central"]
        .groupby(axis=0, level=["funding_fraction", "country", "indicator"])
        .sum()
        .unstack("indicator")
    )

    # join these two dataframes:
    model_results = cases_and_deaths.join(costs).reset_index().sort_values(["country", "cost"]).reset_index(drop=True)

    # Handle_out_of_bounds_costs`: Insert a set of records for zero-funding with same results as for 10% funding
    # and a set of records with float('inf') costs for with the same results as the highest funding level
    if self.handle_out_of_bounds_costs:
        zero_funding_records = model_results.loc[model_results['funding_fraction'] == 0.1].copy()
        zero_funding_records['funding_fraction'] = 0.0
        zero_funding_records['cost'] = 0.0

        inf_funding_records = model_results.loc[model_results['funding_fraction'] == 1.0].copy()
        inf_funding_records['funding_fraction'] = float('inf')
        inf_funding_records['cost'] = float('inf')

        model_results = pd.concat([model_results, zero_funding_records, inf_funding_records], axis=0).sort_values(["country", "cost"]).reset_index(drop=True)

    # Force_monotonic_decreasing`: Within the results for each country, force that cases and deaths are
    #  monotonically decreasing with costs.
    if force_monotonic_decreasing:
        for country in model_results.country.unique():
            raw_sorted_on_cost = model_results.loc[model_results['country'] == country, ['cost', 'deaths', 'cases']].set_index('cost').sort_index(ascending=True)
            model_results.loc[model_results['country'] == country, 'cost'] = raw_sorted_on_cost.index.values
            model_results.loc[model_results['country'] == country, 'cases'] = raw_sorted_on_cost['cases'].cummin().values
            model_results.loc[model_results['country'] == country, 'deaths'] = raw_sorted_on_cost['deaths'].cummin().values

    # Tidy-up (sort and drop any duplicates)
    model_results = model_results.reset_index() \
                                 .drop(columns=["funding_fraction"]) \
                                 .drop_duplicates(subset=['country', 'cost']) \
                                 .sort_values(["country", "cost"])[["country", "cost", "cases", "deaths"]]

    return {
        "tgf_budgets": tgf_budgets,
        "non_tgf_budgets": non_tgf_budgets,
        "model_results": model_results,
    }

get_gp()

Returns data-frame of the GP elements that are needed for reporting.

Source code in src/tgftools/analysis.py
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def get_gp(self) -> pd.DataFrame:
    """Returns data-frame of the GP elements that are needed for reporting."""

    if self.disease_name != 'HIV':
        gp_data = self.database.gp.df['central'].unstack()
    else:
        # Get GP for HIV
        gp_data = self.portfolio_projection_counterfactual(self.EXPECTED_GP_SCENARIO[0])

        # Convert to the same format as other diseases
        gp_data = gp_data.portfolio_results
        gp_data = pd.concat(gp_data, axis=0).reset_index(level=0).rename({'level_0': 'key'}, axis=1)
        gp_data = gp_data.drop(['model_low', 'model_high'], axis=1)
        gp_data = gp_data.pivot(columns='key', values='model_central')

    return gp_data

get_partner()

Returns data-frame of the partner data that are needed for reporting.

Source code in src/tgftools/analysis.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
def get_partner(self) -> pd.DataFrame:
    """Returns data-frame of the partner data that are needed for reporting."""

    if self.disease_name == 'HIV':
        indicator_partner = ['cases', 'deaths', 'hivneg', 'population']
    if self.disease_name == 'TB':
        indicator_partner = ['cases', 'deaths', 'deathshivneg', 'population']
    if self.disease_name == 'MALARIA':
        indicator_partner = ['cases', 'deaths', 'par']

    expected_first_year = self.parameters.get("GRAPH_FIRST_YEAR") - 5
    expected_last_year = self.parameters.get("START_YEAR") + 1

    partner_data = self.database.partner_data.df.loc[
        (self.scenario_descriptor, slice(None), range(expected_first_year, expected_last_year), indicator_partner)].groupby(axis=0, level=['year', 'indicator'])['central'].sum().unstack()

    return partner_data

make_diagnostic_report(plt_show=False, filename=None)

Create a report that compares the results from Approach A and B (and alternative optimisation methods for Approach B if these are specified). :param plt_show: determines whether to show the plot :param filename: filename to save the report to

Source code in src/tgftools/analysis.py
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
def make_diagnostic_report(
        self,
        plt_show: Optional[bool] = False,
        filename: Optional[Path] = None,
):
    """
    Create a report that compares the results from Approach A and B (and alternative optimisation methods for
    Approach B if these are specified).
    :param plt_show: determines whether to show the plot
    :param filename: filename to save the report to
    """
    # Create the approach_b object
    approach_b_object = self._approach_b()

    # Run the report, specifying whether to plot graphs, the filename, and passing through any other kwargs
    # Suppress the returned results as the purpose of this function is generating the report.
    _ = approach_b_object.run(
        plt_show=plt_show,
        filename=filename,
        methods=self.parameters.get('APPROACH_B_METHODS'),
        provide_best_only=False,
    )

portfolio_projection_approach_a()

Returns the PortfolioProjection For Approach A: i.e., the projection for each country, given the funding to each country when the TGF funding allocated to a country CANNOT be changed.

Source code in src/tgftools/analysis.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def portfolio_projection_approach_a(self) -> PortfolioProjection:
    """Returns the PortfolioProjection For Approach A: i.e., the projection for each country, given the funding
    to each country when the TGF funding allocated to a country CANNOT be changed.
    """

    country_results = self._get_country_projections_given_funding_dollar_amounts(
        total_funding_by_country=(
            self.tgf_funding.df["value"] + self.non_tgf_funding.df["value"]
        ).to_dict()
    )
    return PortfolioProjection(
        tgf_funding_by_country=self.tgf_funding.df["value"].to_dict(),
        non_tgf_funding_by_country=self.non_tgf_funding.df["value"].to_dict(),
        country_results=country_results,
        portfolio_results=self._make_portfolio_results(
            country_results=country_results,
            adjust_for_unmodelled_innovation=self.innovation_on,
            name='none',
        ),
    )

portfolio_projection_approach_b()

Returns the PortfolioProjection For Approach B: i.e., the projection for each country, given the funding to each country when the TGF funding allocated to a country CAN be changed. Multiple methods for optimisation may be tried, but only a single result is provided (that of the best solution found.) :param methods: List of methods to use in approach_b (For method see do_approach_b) :param optimisation_params: Dict of parameters specifying how to construct the optimisation. See _get_data_frames_for_approach_b

Source code in src/tgftools/analysis.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def portfolio_projection_approach_b(
    self,
) -> PortfolioProjection:
    """Returns the PortfolioProjection For Approach B: i.e., the projection for each country, given the funding
    to each country when the TGF funding allocated to a country _CAN_ be changed. Multiple methods for optimisation
    may be tried, but only a single result is provided (that of the best solution found.)
    :param methods: List of methods to use in approach_b (For method see `do_approach_b`)
    :param optimisation_params: Dict of parameters specifying how to construct the optimisation.
    See `_get_data_frames_for_approach_b`
    """
    # Use the `ApproachB` class to get the TGF funding allocations from the optimisation, getting only the best
    # result.

    methods = self.parameters.get('APPROACH_B_METHODS')

    results_from_approach_b = self._approach_b().do_approach_b(
        methods=methods, provide_best_only=True
    )
    tgf_funding_under_approach_b = results_from_approach_b.tgf_budget_by_country

    country_results = self._get_country_projections_given_funding_dollar_amounts(
        (
            pd.Series(tgf_funding_under_approach_b)
            + self.non_tgf_funding.df["value"]
        ).to_dict()
    )
    return PortfolioProjection(
        tgf_funding_by_country=tgf_funding_under_approach_b,
        non_tgf_funding_by_country=self.non_tgf_funding.df["value"].to_dict(),
        country_results=country_results,
        portfolio_results=self._make_portfolio_results(
            country_results=country_results,
            adjust_for_unmodelled_innovation=self.innovation_on,
            name='none',
        )
    )

portfolio_projection_approach_c(funding_fraction)

Returns the PortfolioProjection For Approach C: i.e., the funding fraction is the same in all countries

Source code in src/tgftools/analysis.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def portfolio_projection_approach_c(self, funding_fraction: float) -> PortfolioProjection:
    """Returns the PortfolioProjection For Approach C: i.e., the funding fraction is the same in all countries
    """
    country_results = self._get_country_projection_given_funding_fraction(funding_fraction=funding_fraction)
    return PortfolioProjection(
        tgf_funding_by_country=None,  # In this scenario, we do not know the split between TGF and non-TGF sources
        non_tgf_funding_by_country=None,
        country_results=country_results,
        portfolio_results=self._make_portfolio_results(
            country_results=country_results,
            adjust_for_unmodelled_innovation=self.innovation_on,
            name='none',
        ),
    )

portfolio_projection_counterfactual(name)

Returns a PortfolioProjection for a chosen counterfactual scenario.

Source code in src/tgftools/analysis.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def portfolio_projection_counterfactual(
        self,
        name: str,
) -> PortfolioProjection:
    """Returns a PortfolioProjection for a chosen counterfactual scenario."""

    assert name in self.database.model_results.df.index.get_level_values('scenario_descriptor'),\
        f"Counterfactual {name} not found in model results."

    # Create dict of country_results corresponding to the counterfactual scenario
    country_results = dict()
    for country in self.countries:

        model_projection = {
            indicator:
                self.database.model_results.df.loc[(name, slice(None), country, slice(None), indicator)]
                .droplevel(axis=0, level='funding_fraction')
                .rename(columns={'central': 'model_central', 'low': 'model_low', 'high': 'model_high'})
            for indicator in self.database.model_results.indicators
        }

        country_results[country] = CountryProjection(
            model_projection=model_projection,
            funding=float('nan'),
        )

    return PortfolioProjection(
        tgf_funding_by_country={k: float('nan') for k in self.countries},
        non_tgf_funding_by_country={k: float('nan') for k in self.countries},
        country_results=country_results,
        portfolio_results=self._make_portfolio_results(country_results, adjust_for_unmodelled_innovation=False, name=name),
    )

CountryProjection

Bases: NamedTuple

NamedTuple for cases and death for a given program cost in a given country.

Source code in src/tgftools/analysis.py
82
83
84
85
86
87
88
class CountryProjection(NamedTuple):
    """NamedTuple for cases and death for a given program cost in a given country."""

    model_projection: Dict[
        str, pd.DataFrame
    ]  # dict of the form {<indicator>: <pd.DataFrame>}
    funding: float

PortfolioProjection

Bases: NamedTuple

NamedTuple for the results of an Analysis.

Source code in src/tgftools/analysis.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class PortfolioProjection(NamedTuple):
    """NamedTuple for the results of an Analysis."""

    tgf_funding_by_country: dict[str, float]

    non_tgf_funding_by_country: dict[str, float]

    country_results: dict[
        str, CountryProjection
    ]  # dict of the form {<country>: <CountryProjection>}

    portfolio_results: dict[
        str, pd.DataFrame
    ]  # dict of the form {<indicator>: <pd.DataFrame>}, where the pd.DataFrame has years in the row, and columns

Report

This is the BaseClass for Report classes. It provides the core functionality to generate reports. It can be inherited from to allow it to accept sets of PortfolioProjections for the diseases. It intended that each member function will either: (i) return a Dict of the form {

Source code in src/tgftools/report.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class Report:
    """This is the BaseClass for Report classes. It provides the core functionality to generate reports. It can be
    inherited from to allow it to accept sets of PortfolioProjections for the diseases. It intended that each member
    function will either: (i) return a Dict of the form {<label>: <stat>}, or (ii) return a pd.DataFrame. These can be
    assembled into an output Excel file: contents of dicts are written to the 'Stats' worksheet; contents of
    pd.DataFrames are written to their own sheet of the same name.
    """

    def __init__(self, *args, **kwargs):
        """Initialise the Report Class"""

    def _get_all_funcs_to_generate_stats(self) -> list[str]:
        """Returns a list of the functions in the class that will generate statistics (i.e., any function with a name
        that does not start with "_" and is not called "report".
        """
        return sorted(
            [
                name
                for name in dir(self)
                if (
                    not name.startswith("_")
                    and (not name.startswith("report"))
                    and callable(self.__getattribute__(name))
            )
            ]
        )

    def report(self, filename: Optional[Path] = None) -> Dict:
        """Run all member functions, print the results to screen, returns the results in the form of dictionary and
        (if filename provided) assemble them into an Excel file and draw graphs."""

        all_results_for_stats_pages = dict()  # Storage for all the results
        all_results_for_individual_worksheets = dict()

        all_funcs = self._get_all_funcs_to_generate_stats()
        for ch_name in all_funcs:
            pprint(f"** {ch_name} **")
            output = self.__getattribute__(ch_name)()
            pprint(output)

            if isinstance(output, dict):
                all_results_for_stats_pages[ch_name] = output
            elif isinstance(output, pd.DataFrame):
                all_results_for_individual_worksheets[ch_name] = output
            else:
                raise ValueError(f"Return from {ch_name} function is not of recognised type ({type(ch_name)}).")

        # Compile the results for the 'stats' summary
        results_for_main = list()
        for func_name, func_results in all_results_for_stats_pages.items():
            for stat_name, stat_result in func_results.items():
                results_for_main.append([func_name, stat_name, stat_result])

        if filename is not None:
            # Write to Excel
            wb = Workbook()

            # Write to 'stats' worksheet:
            work_sheet_stats = wb.active
            work_sheet_stats.title = 'stats'
            for line in results_for_main:
                work_sheet_stats.append(line)

            # Write results to 'individual' worksheet
            for func_name, func_results in all_results_for_individual_worksheets.items():
                work_sheet = wb.create_sheet()
                work_sheet.title = func_name[0:10]  # truncate to first ten characters, as requirement of Excel
                for r in dataframe_to_rows(func_results.reset_index(), index=False, header=True):
                    work_sheet.append(r)

            # Do any post-processing that may be required
            self._post_processing_on_workbook(wb)

            # Save
            wb.save(filename)

        return {
            # Returning in the same format as the Excel file:
            # * key='main': a pd.DataFrame contains all the scalar stats from individual functions
            # * all other keys/sheets: pd.DataFrames from all the functions that returned pd.DataFrames
            'stats': (
                pd.DataFrame(results_for_main)
                .rename(columns={0: 'Function', 1: 'Key', 2: 'Value'})
            ),
            **all_results_for_individual_worksheets,
        }

    def _post_processing_on_workbook(self, workbook: Workbook):
        """Do anything necessary to post-process the workbook: for instance, create graphs on certain worksheets."""
        pass

__init__(*args, **kwargs)

Initialise the Report Class

Source code in src/tgftools/report.py
23
24
def __init__(self, *args, **kwargs):
    """Initialise the Report Class"""

report(filename=None)

Run all member functions, print the results to screen, returns the results in the form of dictionary and (if filename provided) assemble them into an Excel file and draw graphs.

Source code in src/tgftools/report.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def report(self, filename: Optional[Path] = None) -> Dict:
    """Run all member functions, print the results to screen, returns the results in the form of dictionary and
    (if filename provided) assemble them into an Excel file and draw graphs."""

    all_results_for_stats_pages = dict()  # Storage for all the results
    all_results_for_individual_worksheets = dict()

    all_funcs = self._get_all_funcs_to_generate_stats()
    for ch_name in all_funcs:
        pprint(f"** {ch_name} **")
        output = self.__getattribute__(ch_name)()
        pprint(output)

        if isinstance(output, dict):
            all_results_for_stats_pages[ch_name] = output
        elif isinstance(output, pd.DataFrame):
            all_results_for_individual_worksheets[ch_name] = output
        else:
            raise ValueError(f"Return from {ch_name} function is not of recognised type ({type(ch_name)}).")

    # Compile the results for the 'stats' summary
    results_for_main = list()
    for func_name, func_results in all_results_for_stats_pages.items():
        for stat_name, stat_result in func_results.items():
            results_for_main.append([func_name, stat_name, stat_result])

    if filename is not None:
        # Write to Excel
        wb = Workbook()

        # Write to 'stats' worksheet:
        work_sheet_stats = wb.active
        work_sheet_stats.title = 'stats'
        for line in results_for_main:
            work_sheet_stats.append(line)

        # Write results to 'individual' worksheet
        for func_name, func_results in all_results_for_individual_worksheets.items():
            work_sheet = wb.create_sheet()
            work_sheet.title = func_name[0:10]  # truncate to first ten characters, as requirement of Excel
            for r in dataframe_to_rows(func_results.reset_index(), index=False, header=True):
                work_sheet.append(r)

        # Do any post-processing that may be required
        self._post_processing_on_workbook(wb)

        # Save
        wb.save(filename)

    return {
        # Returning in the same format as the Excel file:
        # * key='main': a pd.DataFrame contains all the scalar stats from individual functions
        # * all other keys/sheets: pd.DataFrames from all the functions that returned pd.DataFrames
        'stats': (
            pd.DataFrame(results_for_main)
            .rename(columns={0: 'Function', 1: 'Key', 2: 'Value'})
        ),
        **all_results_for_individual_worksheets,
    }