Source code for RascalC.raw_covariance_matrices

## Script to collect the raw covariance matrices from the output directory of the C++ code

import numpy as np
import os
from glob import glob
from shutil import copy2, copytree
from warnings import warn
from typing import Callable, Iterable
from .utils import rmdir_if_exists_and_empty


def convert_suffix(suffix: str) -> int | str:
    if suffix.isdigit(): return int(suffix)
    if suffix != "full": raise ValueError("Unknown suffix")
    return suffix


def organize_filename(filename: str, output_groups: dict, jack: bool = False) -> None:
    "Interpret the filename as saved by the C++ code and put it into output_groups array. Jack is for jackknife"
    filename_no_ext = os.path.basename(filename) # remove the directory name
    filename_no_ext = ".".join(filename_no_ext.split(".")[:-1]) # remove the extension
    filename_parts = filename_no_ext.split("_") # split the rest by underscore
    term_name = filename_parts[0] # cN, RRN or EEN
    if term_name not in ("c2", "c3", "c4", "RR1", "RR2", "EE1", "EE2"): return # do not process other arrays
    if jack and term_name.startswith("c"): term_name += "j" # add "j" to cN if doing jack
    output_group_name = "_".join(filename_parts[1:-2]) # nN and mM or lL joined back
    indices = filename_parts[-2] # tracer numbers
    try:
        suffix = convert_suffix(filename_parts[-1]) # "full" or subsample number
    except ValueError:
        raise ValueError(f"Unrecognized {suffix = } in {filename = }")

    if output_group_name not in output_groups: # create an empty sub-dictionary
        output_groups[output_group_name] = {}
    
    matrix_name = term_name + "_" + indices
    if matrix_name not in output_groups[output_group_name]: # create an empty sub-sub-dictionary
        output_groups[output_group_name][matrix_name] = {}
    
    output_groups[output_group_name][matrix_name][suffix] = filename


def save_safe(output_dir: str, output_group_name: str, output_dictionary: dict[str]):
    "Save the dictionary of Numpy arrays into directory avoiding name clashes"
    output_filename = os.path.join(output_dir, f"Raw_Covariance_Matrices_{output_group_name}.npz")
    if os.path.exists(output_filename):
        warn(f"The default output filename for group {output_group_name}, {output_filename}, already exists. Will try to find a replacement.")
        i = 1
        while True:
            output_filename = os.path.join(output_dir, f"Raw_Covariance_Matrices_{output_group_name}.{i}.npz")
            if not os.path.exists(output_filename):
                warn(f"Found unused name {output_filename}, will save there.")
                break
            i += 1
    
    os.makedirs(os.path.dirname(output_filename), exist_ok = True) # make sure the directory exists
    np.savez_compressed(output_filename, **output_dictionary)


def collect_raw_covariance_matrices(cov_dir: str, cleanup: bool = True, print_function: Callable[[str], None] = print) -> dict[str, dict[str, np.ndarray[float]]]:
    """
    Collect the covariance matrices from text files written by the C++ code and organize them into a Numpy .npz file.
    With cleanup enabled (default), deletes the text files after collection.
    """

    cov_dir_all = os.path.join(cov_dir, 'CovMatricesAll/')
    cov_dir_jack = os.path.join(cov_dir, 'CovMatricesJack/')

    output_groups = {}

    # load the full matrices
    for input_filename in glob(cov_dir_all + "*.txt"):
        organize_filename(input_filename, output_groups, jack = False)

    # load the jack matrices if present
    for input_filename in glob(cov_dir_jack + "*.txt"):
        organize_filename(input_filename, output_groups, jack = True)
    
    print_function(f"Detected {len(output_groups)} output group(s) in {cov_dir}")

    return_dictionary = {}
    
    for output_group_name, output_group in output_groups.items():
        print_function(f"Processing output group {output_group_name}")
        # check that the different matrices have the same number of subsamples
        subsample_numbers = []
        for matrix_filenames_dictionary in output_group.values():
            subsample_number = 0
            for suffix in matrix_filenames_dictionary.keys():
                if isinstance(suffix, int): subsample_number += 1
                # subsamples have integer suffixes
            subsample_numbers.append(subsample_number)
        subsample_number = min(subsample_numbers)

        if any(this_subsample_number != subsample_number for this_subsample_number in subsample_numbers):
            warn(f"Some matrices in output group {output_group_name} have different number of subsamples. Will cut to the smallest number of subsamples.")
            # now cut all to the minimal number of subsamples, using the lowest numbers present
            for matrix_filenames_dictionary in output_group.values():
                subsample_suffixes_increasing = sorted([suffix for suffix in matrix_filenames_dictionary.keys() if isinstance(suffix, int)])
                if len(subsample_suffixes_increasing) == 0: continue # some arrays will not have subsamples
                subsample_suffix_max = subsample_suffixes_increasing[subsample_number - 1]
                for suffix in matrix_filenames_dictionary.keys():
                    if isinstance(suffix, int) and suffix > subsample_suffix_max:
                        matrix_filenames_dictionary.pop(suffix)
        
        # now create and fill the dictionary to be saved in the numpy file
        output_dictionary = {}
        for matrix_name, matrix_filenames_dictionary in output_group.items():
            output_dictionary[matrix_name] = dict()
            for suffix, input_filename in matrix_filenames_dictionary.items():
                matrix = np.loadtxt(input_filename)
                if matrix_name.startswith("c2") and matrix.ndim == 1: matrix = np.diag(matrix) # convert 1D c2 to a 2D diagonal matrix
                output_dictionary[matrix_name][suffix] = matrix

            # special treatment for string suffixes (at the moment, only "full")
            tmp_keys = list(output_dictionary[matrix_name].keys())
            for suffix in tmp_keys:
                if isinstance(suffix, str):
                    output_dictionary[matrix_name + "_" + suffix] = output_dictionary[matrix_name].pop(suffix)
                    # this creates a separate array to be saved

            # now all the remaining suffixes must be integers so can be sorted easily
            output_dictionary[matrix_name] = np.array([output_dictionary[matrix_name][i_subsample] for i_subsample in sorted(output_dictionary[matrix_name].keys())])
            # this transformed the dictionary to numpy array, ordered by increasing subsample index

            # calculate the full as the mean of the subsamples
            full_matrix_computed = np.mean(output_dictionary[matrix_name], axis = 0)
            full_matrix_name = matrix_name + "_full"
            if full_matrix_name in output_dictionary:
                if not np.allclose(output_dictionary[full_matrix_name], full_matrix_computed):
                    warn(f"For {matrix_name} matrix, the loaded full is different from the average of subsamples. The latter will be saved.")
                    matrix_filenames_dictionary.pop("full") # remove the filename since it will be technically unused
            output_dictionary[full_matrix_name] = full_matrix_computed
        
        return_dictionary[output_group_name] = output_dictionary
        
        save_safe(cov_dir, output_group_name, output_dictionary)

        # now that the file is saved (not any earlier to be sure), can remove all the text files
        # the list contains only the files that had their contents loaded and saved
        if cleanup:
            for matrix_filenames_dictionary in output_group.values():
                for input_filename in matrix_filenames_dictionary.values():
                    os.remove(input_filename)
        
        print_function(f"Finished with output group {output_group_name}")

    # remove subdirectories too if they are empty
    if cleanup:
        rmdir_if_exists_and_empty(cov_dir_all)
        rmdir_if_exists_and_empty(cov_dir_jack)

    return return_dictionary


def load_raw_covariances(file_root: str, label: str, n_samples: None | int | Iterable[int] | Iterable[bool] = None, print_function: Callable[[str], None] = print) -> dict[str]:
    """
    Load the raw covariance matrices as a dictionary. Uses the Numpy file if it exists, otherwise tried to run the collection function.

    file_root is the directory to look in.

    label specifies the number of radial bins and either the number of angular (mu) bins (nN_mM) or the maximum (even) multipole index (nN_lL).

    n_samples allows to select subsamples flexibly:
    - if None (default) returns all samples;
    - if a positive integer, returns as many samples from the beginning;
    - if a sequence of integers, returns subsamples with indices from this sequence;
    - sequence of boolean values is interpreted as a boolean mask for subsamples.
    """
    input_filename = os.path.join(file_root, f"Raw_Covariance_Matrices_{label}.npz")
    if os.path.isfile(input_filename): raw_cov = dict(np.load(input_filename))
    else:
        print_function(f"Collecting the raw covariance matrices from {file_root}")
        result = collect_raw_covariance_matrices(file_root, print_function = print_function)
        if label not in result:
            raise ValueError(f"Raw covariance matrices for {label} not produced. Check the n and m/max_l values.")
        raw_cov = result[label]
    if n_samples is None: return raw_cov # return the full set
    elif isinstance(n_samples, int):
        if n_samples <= 0: raise ValueError("Number of samples must be positive if integer")
        n_samples = np.arange(n_samples)
    elif isinstance(n_samples, Iterable):
        if all(isinstance(_, int) for _ in n_samples): n_samples = np.array(n_samples, dtype = int)
        elif all(isinstance(_, bool) for _ in n_samples): n_samples = np.array(n_samples, dtype = bool)
        else: raise TypeError("n_samples elements must be all either int (indices) or bool (mask)")
    else: raise TypeError("n_samples must be None, positive int or iterable of int or bool")
    # select the given samples and update the averages
    keys = [key for key in raw_cov.keys() if not key.endswith("_full")]
    for key in keys:
        raw_cov[key] = raw_cov[key][n_samples]
        raw_cov[key + "_full"] = np.mean(raw_cov[key], axis = 0)
    return raw_cov


def load_raw_covariances_smu(file_root: str, n: int, m: int, n_samples: None | int | Iterable[int] | Iterable[bool] = None, print_function: Callable[[str], None] = print) -> dict[str]:
    """
    Load the raw covariance matrices from the s_mu mode as a dictionary. Uses the Numpy file if it exists, otherwise tried to run the collection function.

    file_root is the directory to look in.
    n and m are numbers of radial and angular (mu) bins respectively.

    n_samples allows to select subsamples flexibly:
    - if None (default) returns all samples;
    - if a positive integer, returns as many samples from the beginning;
    - if a sequence of integers, returns subsamples with indices from this sequence;
    - sequence of boolean values is interpreted as a boolean mask for subsamples.
    """
    label = f"n{n}_m{m}"
    return load_raw_covariances(file_root, label, n_samples, print_function)


def load_raw_covariances_legendre(file_root: str, n: int, max_l: int, n_samples: None | int | Iterable[int] | Iterable[bool] = None, print_function = print) -> dict[str]:
    """
    Load the raw covariance matrices from the Legendre modes as a dictionary. Uses the Numpy file if it exists, otherwise tried to run the collection function.

    file_root is the directory to look in.
    n and m are numbers of radial and angular (mu) bins respectively.

    n_samples allows to select subsamples flexibly:
    - if None (default) returns all samples;
    - if a positive integer, returns as many samples from the beginning;
    - if a sequence of integers, returns subsamples with indices from this sequence;
    - sequence of boolean values is interpreted as a boolean mask for subsamples.
    """
    label = f"n{n}_l{max_l}"
    return load_raw_covariances(file_root, label, n_samples, print_function)


[docs] def cat_raw_covariance_matrices(n: int, mstr: str, input_roots: list[str], ns_samples: list[None | int | Iterable[int] | Iterable[bool]], output_root: str, collapse_factor: int = 1, print_function: Callable[[str], None] = print) -> dict[str]: """ Catenate the raw covariance matrices from one or more input directories, assuming they are from similar runs: the same number of input random points, ``N2``, ``N3``, ``N4`` and ``loops_per_sample``. Parameters ---------- n : integer The number of radial bins. mstr : string The next part of the output configuration label: ``m{number of angular bins}`` in s_mu mode and ``l{max_multipole}`` in Legendre modes. input_roots : list of strings Input directories to catenate from. ns_samples : list of the same length as ``input_roots`` Each element allows to select subsamples flexibly from the corresponding input directory: - if None, returns all samples; - if a positive integer, returns as many samples from the beginning; - if a sequence of integers, returns subsamples with indices from this sequence; - sequence of boolean values is interpreted as a boolean mask for subsamples. output_root : string The output directory, in which the NumPy file with the resulting raw covariance matrix arrays is created. collapse_factor : positive integer (Optional) allows to reduce the number of subsamples by averaging over a given number of adjacent subsamples. Default value is 1, meaning no reduction. print_function : Callable (Optional) custom function to use for printing. Default is ``print``. Returns ------- catenation_results : dict[str, np.ndarray[float]] Catenated raw covariance matrices as a dictionary with string keys and Numpy array values. All this information is also saved in a ``Raw_Covariance_Matrices*.npz`` file in the ``output_root``. """ if collapse_factor <= 0: raise ValueError("Collapsing factor must be positive") if len(input_roots) < 1: raise ValueError("Need at least one input directory") if len(ns_samples) != len(input_roots): raise ValueError("Number of input dirs and subsamples to use from them must be the same") label = f"n{n}_{mstr}" result = {} for index, (input_root, n_samples) in enumerate(zip(input_roots, ns_samples)): input_file = load_raw_covariances(input_root, label, n_samples, print_function) # ignore full arrays input_file = {key: value for (key, value) in input_file.items() if not key.endswith("_full")} # check that the keys are the same, unless the result is brand new if len(result) > 0: result_keys = set(result.keys()) input_keys = set(input_file.keys()) if result_keys != input_keys: warn("Different sets of matrices present among the input files, will only use the overlapping ones.") common_keys = result_keys & input_keys result = {key: result[key] for key in common_keys} input_file = {key: input_file[key] for key in common_keys} # finally, loop over all the arrays for matrix_name, matrices in input_file.items(): if matrix_name.endswith("_full"): continue # ignore full arrays if index != 0: result[matrix_name] = np.append(result[matrix_name], matrices, axis = 0) else: result[matrix_name] = matrices # loop over all the matrix names for matrix_name in list(result.keys()): # the dictionary will be changed if collapse_factor > 1: matrix_shape = result[matrix_name].shape result[matrix_name] = np.mean(result[matrix_name].reshape(matrix_shape[0] // collapse_factor, collapse_factor, *matrix_shape[1:]), axis = 1) # average over adjacent collapse_factor samples # make full arrays by averaging the subsamples matrix_name_full = matrix_name + "_full" result[matrix_name_full] = np.mean(result[matrix_name], axis = 0) save_safe(output_root, label, result) # copy other useful files from the first input root, unless identical with the output root # assuming they are the same among output roots; otherwise catenation should not be sensible if not os.path.samefile(input_roots[0], output_root): for pattern in ("weights", "xi*", "radial_binning*.csv"): for filename in glob(pattern, root_dir = input_roots[0]): src = os.path.join(input_roots[0], filename) (copytree if os.path.isdir(src) else copy2)(src, os.path.join(output_root, filename)) # need different functions for dirs and files return result