Source code for RascalC.interface

"""Implements an interface to estimate the covariance of 2-point correlation function."""

import pycorr
import numpy as np
import os
from datetime import datetime
from typing import Iterable, Literal
from warnings import warn
from .pycorr_utils.utils import fix_bad_bins_pycorr, write_xi_file
from .write_binning_file import write_binning_file
from .pycorr_utils.jack import get_jack_xi_weights_counts_from_pycorr
from .pycorr_utils.counts import get_counts_from_pycorr
from .pycorr_utils.input_xi import get_input_xi_from_pycorr
from .mu_bin_legendre_factors import write_mu_bin_legendre_factors
from .correction_function import compute_correction_function, compute_correction_function_multi
from .convergence_check_extra import convergence_check_extra
from .utils import rmdir_if_exists_and_empty


suffixes_tracer_all = ("", "2") # all supported tracer suffixes
indices_corr_all = ("11", "12", "22") # all supported 2PCF indices
suffixes_corr_all = ("", "12", "2") # all supported 2PCF suffixes
tracer1_corr = (0, 0, 1)
tracer2_corr = (0, 1, 1)


[docs] def run_cov(mode: Literal["s_mu", "legendre_projected", "legendre_accumulated"], nthread: int, N2: int, N3: int, N4: int, n_loops: int, loops_per_sample: int, out_dir: str, tmp_dir: str, randoms_positions1: np.ndarray[float], randoms_weights1: np.ndarray[float], pycorr_allcounts_11: pycorr.twopoint_estimator.BaseTwoPointEstimator, xi_table_11: pycorr.twopoint_estimator.BaseTwoPointEstimator | tuple[np.ndarray[float], np.ndarray[float], np.ndarray[float]] | tuple[np.ndarray[float], np.ndarray[float], np.ndarray[float], np.ndarray[float], np.ndarray[float]] | list[np.ndarray[float]], position_type: Literal["rdd", "xyz", "pos"] = "pos", xi_table_12: None | pycorr.twopoint_estimator.BaseTwoPointEstimator | tuple[np.ndarray[float], np.ndarray[float], np.ndarray[float]] | tuple[np.ndarray[float], np.ndarray[float], np.ndarray[float], np.ndarray[float], np.ndarray[float]] | list[np.ndarray[float]] = None, xi_table_22: None | pycorr.twopoint_estimator.BaseTwoPointEstimator | tuple[np.ndarray[float], np.ndarray[float], np.ndarray[float]] | tuple[np.ndarray[float], np.ndarray[float], np.ndarray[float], np.ndarray[float], np.ndarray[float]] | list[np.ndarray[float]] = None, xi_cut_s: float = 250, xi_refinement_iterations: int = 10, pycorr_allcounts_12: pycorr.twopoint_estimator.BaseTwoPointEstimator | None = None, pycorr_allcounts_22: pycorr.twopoint_estimator.BaseTwoPointEstimator | None = None, normalize_wcounts: bool = True, no_data_galaxies1: float | None = None, no_data_galaxies2: float | None = None, randoms_samples1: np.ndarray[int] | None = None, randoms_positions2: np.ndarray[float] | None = None, randoms_weights2: np.ndarray[float] | None = None, randoms_samples2: np.ndarray[int] | None = None, max_l: int | None = None, boxsize: float | None = None, skip_s_bins: int | tuple[int, int] = 0, skip_l: int = 0, shot_noise_rescaling1: float = 1, shot_noise_rescaling2: float = 1, sampling_grid_size: int = 301, coordinate_scaling: float = 1, seed: int | None = None, verbose: bool = False) -> dict[str, np.ndarray[float]]: r""" Run the 2-point correlation function covariance integration. Parameters ---------- mode : string Choice of binning setup, one of: - ``"s_mu"``: compute covariance of the correlation function in s, µ bins. Only linear µ binning between 0 and 1 supported. - ``"legendre_projected"``: compute covariance of the correlation function Legendre multipoles in separation (s) bins projected from µ bins (only linear µ binning supported between 0 and 1). Procedure matches ``pycorr``. Works with jackknives, may be less efficient in periodic geometry. - ``"legendre_accumulated"``: compute covariance of the correlation function Legendre multipoles in separation (s) bins accumulated directly, without first doing µ-binned counts. Incompatible with jackknives. max_l : integer Max Legendre multipole index (required in both Legendre ``mode``\s). Has to be even. boxsize : None or float Periodic box side (one number — so far, only cubic boxes are supported). All the coordinates need to be between 0 and ``boxsize``. If None (default), assumed aperiodic. position_type : string, default="pos" Type of input positions, one of: - "rdd": RA, Dec (both in degrees), distance; shape (3, N) - "xyz": Cartesian positions, shape (3, N) - "pos": Cartesian positions, shape (N, 3). randoms_positions1 : array of floats, shaped according to `position_type` Coordinates of random points for the first tracer. randoms_weights1 : array of floats of length N_randoms Weights of random points for the first tracer. randoms_samples1 : None or array of integers of length N_randoms Jackknife region numbers for random points for the first tracer. If given (and not None), enables the jackknife functionality (tuning of shot-noise rescaling on jackknife correlation function estimates). The jackknife assignment must match the jackknife counts in ``pycorr_allcounts_11`` (and ``pycorr_allcounts_12`` with multi-tracer functionality enabled). randoms_positions2 : None or array of floats, shaped according to `position_type` (Optional) coordinates of random points for the second tracer. If given and not None, enables the multi-tracer functionality (full two-tracer covariance estimation). randoms_weights2 : None or array of floats of length N_randoms2 (Optional) weights of random points for the second tracer (required for multi-tracer functionality). randoms_samples2 : None or array of integers of length N_randoms2 (Optional) jackknife region numbers for the second tracer (required for multi-tracer + jackknife functionality, although this combination has not been used yet). The jackknife assignment must match the jackknife counts in ``pycorr_allcounts_12`` and ``pycorr_allcounts_22``. pycorr_allcounts_11 : ``pycorr.TwoPointEstimator`` ``pycorr.TwoPointEstimator`` with auto-counts for the first tracer. Must be rebinned and/or cut to the separation (s) bins desired for the covariance. Note that more bins result in slower convergence. A typical configuration has been 4 Mpc/h wide bins between 20 and 200 Mpc/h. The counts will be wrapped to positive µ, so if the µ range in them is from -1 to 1, the number of µ bins must be even. Providing unwrapped counts (µ from -1 to 1) is preferrable, because some issues can be fixed by assuming symmetry. - In "s_mu" ``mode``, the covariance will be done for the given number of µ bins (after wrapping). - In "legendre_projected" ``mode``, it will be assumed that Legendre multipoles are projected from the same number of µ bins as present in these counts. One might consider rebinning more coarsely for faster performance but less guaranteed accuracy (neither effect has been tested yet). - In "legendre_accumulated" ``mode``, all the present µ bins (after wrapping) will be used to fit the survey correction functions. For jackknife functionality, ``pycorr_allcounts_11`` must contain jackknife RR counts and correlation function. The jackknife assigment must match ``randoms_samples1``. pycorr_allcounts_12 : ``pycorr.TwoPointEstimator`` (Optional) ``pycorr.TwoPointEstimator`` with cross-counts between the two tracers (order does not matter, because they will be wrapped). Must have the same bin configuration as ``pycorr_allcounts_11``. For jackknife functionality, must contain jackknife RR counts and correlation function. The jackknife assigment must match ``randoms_samples1`` and ``randoms_samples2``. pycorr_allcounts_22 : ``pycorr.TwoPointEstimator`` (Optional) ``pycorr.TwoPointEstimator`` with auto-counts for the second tracer. Must have the same bin configuration as ``pycorr_allcounts_11``. For jackknife functionality, must contain jackknife RR counts and correlation function. The jackknife assigment must match ``randoms_samples2``. normalize_wcounts : boolean (Optional) whether to normalize the weights and weighted counts. If False, the provided RR counts must match what can be obtained from given randoms, otherwise the covariance matrix will be off by a constant factor. Example: if counts were computed with ``n_randoms`` roughly similar random chunks and only one is provided to RascalC here, the counts should be divided by ``n_random`` where ``s > split_above`` and by ``n_random ** 2`` where ``s < split_above``. If True (default), the weights will be normalized so that their sum is 1 and the counts will be normalized by their ``wnorm``, which gives a match with default ``pycorr`` normalization settings. no_data_galaxies1 : None or float (Optional) number of first tracer data (not random!) points for the covariance rescaling. If None (default), the code will attempt to obtain it from ``pycorr_allcounts_11``. no_data_galaxies2 : None or float (Optional) number of second tracer data (not random!) points for the covariance rescaling. If None (default), the code will attempt to obtain it from ``pycorr_allcounts_22``. xi_table_11 : ``pycorr.TwoPointEstimator``, or sequence (tuple or list) of 3 elements: (s_values, mu_values, xi_values), or sequence (tuple or list) of 4 elements: (s_values, mu_values, xi_values, s_edges) Table of first tracer auto-correlation function in separation (s) and µ bins. The code will use it for interpolation in the covariance matrix integrals. Important: if the given correlation function is an average in s, µ bins, the separation bin edges need to be provided (and the µ bins are assumed to be linear) for rescaling procedure which ensures that the interpolation results averaged over s, µ bins returns the given correlation function. In case of ``pycorr.TwoPointEstimator``, the edges will be recovered automatically. Unwrapped estimators (µ from -1 to 1) are preferred, because symmetry allows to fix some issues. In the sequence format: - s_values must be a 1D array of reference separation (s) values for the table, of length N; - mu_values must be a 1D array of reference µ values (covering the range from 0 to 1) for the table, of length M; - xi_values must be an array of correlation function values at those s, µ values of shape (N, M); - s_edges, if given, must be a 1D array of separation bin edges of length N+1. The bins must come close to zero separation (say start at ``s <= 0.01``). The sequence containing 3 elements should be used for theoretical models evaluated at a grid of s, mu values. The 4-element format should be used for bin-averaged estimates. xi_table_12 : None or the same format as ``xi_table_11`` Table of the two tracer's cross-correlation function in separation (s) and µ bins. The code will use it for interpolation in the covariance matrix integrals. Required for multi-tracer functionality. xi_table_22 : None or the same format as ``xi_table_11`` Table of second tracer auto-correlation function in separation (s) and µ bins. The code will use it for interpolation in the covariance matrix integrals. Required for multi-tracer functionality. xi_cut_s : float (Optional) separation value beyond which the correlation function is assumed to be zero for the covariance matrix integrals. Default: 250. Between the maximum separation from ``xi_table``\s and ``xi_cut_s``, the correlation function is extrapolated as :math:`\propto s^{-4}`. xi_refinement_iterations : integer (Optional) number of iterations in the correlation function refinement procedure for interpolation inside the code, ensuring that the bin-averaged interpolated values match the binned correlation function estimates. Default: 10. Important: the refinement procedure is disabled completely regardless of this setting if the ``xi_table``\s are sequences of 3 elements, since they are presumed to be a theoretical model evaluated at a grid of s, mu values and not bin averages. nthread : integer Number of (OpenMP) threads to use. Can not utilize more threads than ``n_loops``. - IMPORTANT: AVOID multi-threading in the Python process calling this function (e.g. at NERSC this would mean not setting ``OMP_*`` and other ``*_THREADS`` environment variables; the code should be able to set them by itself). Otherwise the code may run effectively single-threaded. If you need other multi-threaded calculations, run them separately or spawn sub-processes. N2 : integer Number of secondary points to sample per each primary random point. Setting too low (below 5) is not recommended. N3 : integer Number of tertiary points to sample per each secondary point. Setting too low (below 5) is not recommended. N4 : integer Number of quaternary points to sample per each tertiary point. Setting too low (below 5) is not recommended. n_loops : integer Number of integration loops. For optimal balancing and minimal idle time, should be a few times (at least twice) ``nthread`` and exactly divisible by it. The runtime roughly scales as the number of quads per the number of threads, :math:`\mathcal{O}`\(``N_randoms * N2 * N3 * N4 * n_loops / nthread``). For reference, on NERSC Perlmutter CPU half-node the code processed about 27 millions (2.7e7) quads per second per thread (using 64 threads on half a node) as of December 2024. (In Legendre projected mode, which is probably the slowest, with ``N2 = 5``, ``N3 = 10``, ``N4 = 20``.) In single-tracer mode, the number of quads is ``N_randoms * N2 * N3 * N4 * n_loops``. In two-tracer mode, the number of quads is ``(5 * N_randoms1 + 2 * N_randoms2) * N2 * N3 * N4 * n_loops``. loops_per_sample : integer Number of loops to merge into one output sample. Must divide ``max_loops``. Recommended to keep the number of samples = ``n_loops / loops_per_sample`` roughly between 10 and 30. out_dir : string Directory for important outputs. Moderate disk space required (up to a few hundred megabytes), but increases with covariance matrix size and number of samples (see above). tmp_dir : string Directory for temporary files. Can be deleted after running the code, and is cleaned up after the normal execution. More disk space required - needs to store all the input arrays in the current implementation. skip_s_bins : integer or tuple of two integers (Optional) removal of separations bins at the post-processing stage. First (or the only) number sets the number of radial/separation bins to skip from the beginning (lowest-separation bins tend to converge worse and probably will not be precise due to the limitations of the formalism). Second number (if provided) sets the number of radial/separation bins to skip from the end. By default, no bins are skipped at the post-processing stage. skip_l : integer (Only for the Legendre modes; optional) number of highest (even) multipoles to skip at the post-processing stage. (Higher multipole moments of the correlation function tend to converge worse.) Default 0 (no skipping). shot_noise_rescaling1 : float (Optional) shot-noise rescaling value for the first tracer if known beforehand. Default 1 (no rescaling). Will be ignored in jackknife mode - then shot-noise rescaling is optimized on the auto-covariance. shot_noise_rescaling2 : float (Optional) shot-noise rescaling value for the second tracer if known beforehand. Default 1 (no rescaling). Will be ignored in jackknife mode - then shot-noise rescaling is optimized on the auto-covariance. seed : integer or None (Optional) If given as an integer, allows to reproduce the results with the same settings, except the number of threads. Individual subsamples may differ because they are accumulated/written in order of loop completion which may depend on external factors at runtime, but the final integrals should be the same. If None (default), the initialization will be random. sampling_grid_size : integer (Optional) first guess for the sampling grid size. The code should be able to find a suitable number automatically. verbose : bool (Optional) report each 5% of each loop's progress by printing. Default False (off). This can be a lot of output, only use when the number of loops is small. coordinate_scaling : float (Optional) scaling factor for all the Cartesian coordinates. Default 1 (no rescaling). This option is supported by the C++ code, but its use cases are not very clear. Returns ------- post_processing_results : dict[str, np.ndarray[float]] Post-processing results as a dictionary with string keys and Numpy array values. All this information is also saved in a ``Rescaled_Covariance_Matrices*.npz`` file in the output directory. Selected common keys are: ``"full_theory_covariance"`` for the final covariance matrix and ``"shot_noise_rescaling"`` for the shot-noise rescaling value(s). There will also be a ``Raw_Covariance_Matices*.npz`` file in the output directory (as long as the C++ code has run without errors), which can be post-processed separately in a different way. """ if mode not in ("s_mu", "legendre_accumulated", "legendre_projected"): raise ValueError("Given mode not supported") # Set mode flags legendre_orig = (mode == "legendre_accumulated") legendre_mix = (mode == "legendre_projected") legendre = legendre_orig or legendre_mix jackknife = randoms_samples1 is not None if (legendre_orig and jackknife): raise ValueError("Direct accumulation Legendre mode is not compatible with jackknives") if legendre: if max_l is None: raise TypeError("Max ell must be provided in Legendre mode") if max_l % 2 != 0: raise ValueError("Only even Legendre multipoles supported") # Set some other flags periodic = bool(boxsize) # False for None (default) and 0 two_tracers = randoms_positions2 is not None if two_tracers: # check that everything is set accordingly if randoms_weights2 is None: raise TypeError("Second tracer weights must be provided in two-tracer mode") if jackknife: # although this case has not been used so far if randoms_samples2 is None: raise TypeError("Second tracer jackknife region numbers must be provided in two-tracer jackknife mode") if legendre_mix: warn("Projected Legendre post-processing for jackknife not implemented for multi-tracer. Please contact the developer for a workaround") if pycorr_allcounts_12 is None: raise TypeError("Cross-counts must be provided in two-tracer mode") if pycorr_allcounts_22 is None: raise TypeError("Second tracer auto-counts must be provided in two-tracer mode") if xi_table_12 is None: raise TypeError("Cross-correlation function must be provided in two-tracer mode") if xi_table_22 is None: raise TypeError("Second tracer auto-correlation function must be provided in two-tracer mode") if n_loops % loops_per_sample != 0: raise ValueError("The sample collapsing factor must divide the number of loops") ntracers = 2 if two_tracers else 1 ncorr = ntracers * (ntracers + 1) // 2 suffixes_tracer = suffixes_tracer_all[:ntracers] indices_corr = indices_corr_all[:ncorr] suffixes_corr = suffixes_corr_all[:ncorr] s_edges = pycorr_allcounts_11.edges[0] n_r_bins = len(s_edges) - 1 n_mu_bins = pycorr_allcounts_11.shape[1] if pycorr_allcounts_11.edges[1][0] < 0: n_mu_bins //= 2 # will be wrapped if jackknife: jack_region_numbers = np.unique(randoms_samples1) njack = len(jack_region_numbers) if two_tracers: if not np.array_equal(jack_region_numbers, np.unique(randoms_samples2)): # comparison is good because unique results are sorted raise ValueError("The sets of jackkknife labels of the two tracers must be the same") # set the technical filenames input_filenames = [os.path.join(tmp_dir, str(t) + ".txt") for t in range(ntracers)] cornames = [os.path.join(out_dir, f"xi/xi_{index}.dat") for index in indices_corr] binned_pair_names = [os.path.join(out_dir, "weights/" + ("binned_pair" if jackknife else "RR") + f"_counts_n{n_r_bins}_m{n_mu_bins}" + (f"_j{njack}" if jackknife else "") + f"_{index}.dat") for index in indices_corr] if jackknife: jackknife_weights_names = [os.path.join(out_dir, f"weights/jackknife_weights_n{n_r_bins}_m{n_mu_bins}_j{njack}_{index}.dat") for index in indices_corr] xi_jack_names = [os.path.join(out_dir, f"xi_jack/xi_jack_n{n_r_bins}_m{n_mu_bins}_j{njack}_{index}.dat") for index in indices_corr] jackknife_pairs_names = [os.path.join(out_dir, f"weights/jackknife_pair_counts_n{n_r_bins}_m{n_mu_bins}_j{njack}_{index}.dat") for index in indices_corr] if legendre_orig: phi_names = [os.path.join(out_dir, f"BinCorrectionFactor_n{n_r_bins}_" + ("periodic" if periodic else f'm{n_mu_bins}') + f"_{index}.txt") for index in indices_corr] # make sure the dirs exist # os.makedirs() will become confused if the path elements to create include pardir (eg. “..” on UNIX systems). # os.path.abspath should prevent this out_dir_safe = os.path.abspath(out_dir) os.makedirs(out_dir_safe, exist_ok = True) os.makedirs(os.path.join(out_dir_safe, "xi"), exist_ok = True) os.makedirs(os.path.join(out_dir_safe, "weights"), exist_ok = True) if jackknife: os.makedirs(os.path.join(out_dir_safe, "xi_jack"), exist_ok = True) # before creating the temporary directory, find which of its parent directories do not exist (yet), to remove them once they are not needed, but only if they are empty tmp_max_iterations = 100 tmp_dirs_to_clean_up = [] tmp_path = os.path.abspath(tmp_dir) # start walking up the path from the temporary directory; this should also remove the slash at the end of tmp_dir if it was there for _ in range(tmp_max_iterations): # this could be a "while not" loop, but I wanted to prevent a possibility of an endless/unreasonably long loop if os.path.exists(tmp_path): break # terminate the loop when we found a pre-existing directory tmp_dirs_to_clean_up.append(tmp_path) # if we reach this, this directory does not exist, add it to the cleanup list tmp_path = os.path.dirname(tmp_path) # should always jump to the parent directory else: # this is when the loop executes all tmp_max_iterations iterations without encountering the break statement, i.e. potentially becomes infinite even though it shouldn't tmp_dirs_to_clean_up = [os.path.abspath(tmp_dir)] # reset the cleanup list to the temporary directory only (the old default behavior) warn(f"The check of the newly created temporary directory path components failed to finish in {tmp_max_iterations} steps. This is not critical, but should not happen. Please inform the RascalC maintainer of this occurrence.") os.makedirs(os.path.abspath(tmp_dir), exist_ok = True) # finally, create the temporary dir and all its missing parents # Create a log file in output directory logfilename = "log.txt" logfile = os.path.join(out_dir, logfilename) def print_log(s: object) -> None: os.system(f"echo \"{s}\" >> {logfile}") def print_and_log(s: object) -> None: print(s) print_log(s) print_and_log(f"Mode: {mode}") print_and_log(f"Periodic box: {periodic}") if periodic: print_and_log(f"Box side: {boxsize}") print_and_log(f"Jackknife: {jackknife}") print_and_log(f"Number of tracers: {1 + two_tracers}") print_and_log(f"Normalizing weights and weighted counts: {normalize_wcounts}") print_and_log(datetime.now()) counts_factor = None if normalize_wcounts else 1 ndata = [no_data_galaxies1, no_data_galaxies2][:ntracers] # convert counts and jackknife xi if needed; loading ndata too whenever not given pycorr_allcounts_all = (pycorr_allcounts_11, pycorr_allcounts_12, pycorr_allcounts_22) for c, pycorr_allcounts in enumerate(pycorr_allcounts_all[:ncorr]): if c and not np.allclose(pycorr_allcounts.edges[0], pycorr_allcounts_11.edges[0]): raise ValueError(f"pycorr_allcounts_{indices_corr[c]} separation/radial binning is not consistent with pycorr_allcounts_11") if pycorr_allcounts.edges[1][0] < 0: # try to fix and wrap pycorr_allcounts = fix_bad_bins_pycorr(pycorr_allcounts) print_and_log(f"Wrapping pycorr_allcounts_{indices_corr[c]} to µ>=0") pycorr_allcounts = pycorr_allcounts.wrap() if len(pycorr_allcounts.edges[1]) != n_mu_bins + 1: raise ValueError(f"The number of angular/µ bins in pycorr_allcounts_{indices_corr[c]} is not consistent with the number determined from pycorr_allcounts_11") if not np.allclose(pycorr_allcounts.edges[1], np.linspace(0, 1, n_mu_bins + 1)): raise ValueError(f"pycorr_allcounts_{indices_corr[c]} mu/µ binning is not consistent with linear between 0 and 1 (after wrapping)") RR_counts = get_counts_from_pycorr(pycorr_allcounts, counts_factor) np.savetxt(binned_pair_names[c], RR_counts.reshape(-1, 1)) # the file needs to have 1 column if jackknife: xi_jack, jack_weights, jack_RR_counts = get_jack_xi_weights_counts_from_pycorr(pycorr_allcounts, counts_factor) if not np.allclose(np.sum(jack_RR_counts, axis=0), RR_counts.ravel()): raise ValueError("Total counts mismatch") ## Write to files using numpy functions write_xi_file(xi_jack_names[c], pycorr_allcounts.sepavg(axis = 0), pycorr_allcounts.sepavg(axis = 1), xi_jack) jack_numbers = pycorr_allcounts.realizations # column of jackknife numbers, may be useless but needed for format compatibility if not np.array_equal(np.array(jack_region_numbers, dtype = int), np.sort(np.array(jack_numbers, dtype = int))): raise ValueError(f"The code requires integer jackknife numbers consistent between the randoms_samples1 and pycorr_allcounts_{indices_corr[c]}") # jack_region_numbers are the unique jackknife labels from the data (already sorted) np.savetxt(jackknife_weights_names[c], np.column_stack((jack_numbers, jack_weights))) np.savetxt(jackknife_pairs_names[c], np.column_stack((jack_numbers, jack_RR_counts))) # not really needed for the C++ code or processing but let it be # fill ndata if not given if not ndata[tracer1 := tracer1_corr[c]]: ndata[tracer1] = pycorr_allcounts.D1D2.size1 if any(not tracer_ndata for tracer_ndata in ndata): raise ValueError("Not given and not recovered all the necessary normalization factors (no_data_galaxies1/2)") print_and_log(f"Number(s) of data galaxies: {ndata}") # write the xi file(s); need to set the 2PCF binning (even if only technical) and decide whether to rescale the 2PCF in the C++ code all_xi = (xi_table_11, xi_table_12, xi_table_22) xi_s_edges = None xi_n_mu_bins = None refine_xi = False for c, xi in enumerate(all_xi[:ncorr]): if c > 0: if type(xi) != type(all_xi[0]): raise TypeError(f"xi_table_{indices_corr[c]} must have the same type as xi_table_11") if xi.shape != all_xi[0].shape: raise ValueError(f"xi_table_{indices_corr[c]} must have the same shape as xi_table_11") if isinstance(xi, pycorr.twopoint_estimator.BaseTwoPointEstimator): refine_xi = True if xi.edges[1][0] < 0: xi = fix_bad_bins_pycorr(xi) print_and_log(f"Wrapping xi_table_{indices_corr[c]} to µ>=0") xi = xi.wrap() if c == 0: xi_n_mu_bins = xi.shape[1] xi_s_edges = xi.edges[0] elif not np.allclose(xi_s_edges, xi.edges[0]): raise ValueError("Different binning for different correlation functions not supported") if not np.allclose(xi.edges[1], np.linspace(0, 1, xi_n_mu_bins + 1)): raise ValueError(f"xi_table_{indices_corr[c]} µ binning is not consistent with linear between 0 and 1 (after wrapping)") write_xi_file(cornames[c], xi.sepavg(axis = 0), xi.sepavg(axis = 1), get_input_xi_from_pycorr(xi)) elif isinstance(xi, Iterable): if len(xi) == 4: # the last element is the edges refine_xi = True if c == 0: xi_s_edges = xi[-1] elif not np.allclose(xi_s_edges, xi[-1]): raise ValueError("Different binning for different correlation functions not supported") xi = xi[:-1] if len(xi) != 3: raise ValueError(f"xi_table {indices_corr[c]} must have 3 or 4 elements if a tuple/list") r_vals, mu_vals, xi_vals = xi if len(xi_vals) != len(r_vals): raise ValueError(f"xi_values {indices_corr[c]} must have the same number of rows as r_values") if len(xi_vals[0]) != len(mu_vals): raise ValueError(f"xi_values {indices_corr[c]} must have the same number of columns as mu_values") if c == 0: xi_n_mu_bins = len(mu_vals) if not refine_xi: xi_s_edges = (r_vals[:-1] + r_vals[1:]) / 2 # middle values as midpoints of r_vals to be safe xi_s_edges = [1e-4] + xi_s_edges + [2 * r_vals[-1] - xi_s_edges[-1]] # set the lowest edge near 0 and the highest beyond the last point of r_vals write_xi_file(cornames[c], r_vals, mu_vals, xi_vals) else: raise TypeError(f"Xi table {indices_corr[c]} must be either a pycorr.TwoPointEstimator or a tuple/list") xi_refinement_iterations *= refine_xi # True is 1; False is 0 => 0 iterations => no refinement # write the randoms file(s) randoms_positions = [randoms_positions1, randoms_positions2] randoms_weights = [randoms_weights1, randoms_weights2] randoms_samples = (randoms_samples1, randoms_samples2) for t, input_filename in enumerate(input_filenames): randoms_properties = pycorr.twopoint_counter._format_positions(randoms_positions[t], mode = "smu", position_type = position_type, dtype = np.float64) # list of x, y, z coordinate arrays; weights (and jackknife region numbers if any) will be appended if legendre_orig: randoms_positions[t] = np.array(randoms_properties) # save the formatted positions as an array for correction function computation nrandoms = len(randoms_properties[0]) if randoms_weights[t].ndim != 1: raise ValueError(f"Weights of randoms {t+1} not contained in a 1D array") if len(randoms_weights[t]) != nrandoms: raise ValueError(f"Number of weights for randoms {t+1} mismatches the number of positions") if normalize_wcounts: randoms_weights[t] /= np.sum(randoms_weights[t]) randoms_properties.append(randoms_weights[t]) if jackknife: if randoms_samples[t].ndim != 1: raise ValueError(f"Weights of sample labels {t+1} not contained in a 1D array") if len(randoms_samples[t]) != nrandoms: raise ValueError(f"Number of sample labels for randoms {t+1} mismatches the number of positions") randoms_properties.append(randoms_samples[t]) np.savetxt(input_filename, np.column_stack(randoms_properties)) randoms_properties = None # write the binning files binfile = os.path.join(out_dir, "radial_binning_cov.csv") write_binning_file(binfile, s_edges) binfile_cf = os.path.join(out_dir, "radial_binning_corr.csv") write_binning_file(binfile_cf, xi_s_edges) # Select the executable name exec_name = "bin/cov." + mode + "_jackknife" * jackknife + "_periodic" * periodic + "_verbose" * verbose # the above must be true relative to the script location # below we should make it absolute, i.e. right regardless of the working directory exec_path = os.path.join(os.path.realpath(os.path.dirname(__file__)), exec_name) # form the command line command = "env OMP_PROC_BIND=spread OMP_PLACES=threads " # set OMP environment variables, should not be set before command += f"{exec_path} -output {out_dir}/ -nside {sampling_grid_size} -rescale {coordinate_scaling} -nthread {nthread} -maxloops {n_loops} -loopspersample {loops_per_sample} -N2 {N2} -N3 {N3} -N4 {N4} -xicut {xi_cut_s} -binfile {binfile} -binfile_cf {binfile_cf} -mbin_cf {xi_n_mu_bins} -cf_loops {xi_refinement_iterations}" # here are universally acceptable parameters command += "".join([f" -in{suffixes_tracer[t]} {input_filenames[t]}" for t in range(ntracers)]) # provide all the random filenames command += "".join([f" -norm{suffixes_tracer[t]} {ndata[t]}" for t in range(ntracers)]) # provide all ndata for normalization command += "".join([f" -cor{suffixes_corr[c]} {cornames[c]}" for c in range(ncorr)]) # provide all correlation functions if legendre: # only provide max multipole l for now command += f" -max_l {max_l}" if legendre_mix: # generate and provide factors filename mu_bin_legendre_file = write_mu_bin_legendre_factors(n_mu_bins, max_l, os.path.join(out_dir, "weights")) command += f" -mu_bin_legendre_file {mu_bin_legendre_file}" if not legendre_orig: # provide binned pair counts files and number of mu bin command += "".join([f" -RRbin{suffixes_corr[c]} {binned_pair_names[c]}" for c in range(ncorr)]) + f" -mbin {n_mu_bins}" if periodic: # append periodic flag and box size command += f" -perbox -boxsize {boxsize}" if jackknife: # provide jackknife weight files for all correlations command += "".join([f" -jackknife{suffixes_corr[c]} {jackknife_weights_names[c]}" for c in range(ncorr)]) # compute the correction function if original Legendre if legendre_orig: # need correction function print_and_log(datetime.now()) print_and_log(f"Computing the correction function") if ntracers == 1: compute_correction_function(randoms_positions[0], randoms_weights[0], binfile, out_dir, periodic, binned_pair_names[0], print_and_log) elif ntracers == 2: compute_correction_function_multi(randoms_positions[0], randoms_weights[0], randoms_positions[1], randoms_weights[1], binfile, out_dir, periodic, *binned_pair_names, print_function = print_and_log) command += "".join([f" -phi_file{suffixes_corr[c]} {phi_names[c]}" for c in range(ncorr)]) # deal with the seed if seed is not None: # need to pass to the C++ code and make sure it can be received properly if not isinstance(seed, int): raise TypeError("Seed must be int or None") seed &= 2**32 - 1 # this bitwise AND truncates the seed into a 32-bit unsigned (positive) integer (definitely a subset of unsigned long) command += f" -seed {seed}" # run the main code print_and_log(datetime.now()) print_and_log(f"Launching the C++ code with command: {command}") status = os.system(f"bash -c 'set -o pipefail; stdbuf -oL -eL {command} 2>&1 | tee -a {logfile}'") # tee prints what it gets to stdout AND saves to file # stdbuf -oL -eL should solve the output delays due to buffering without hurting the performance too much # without pipefail, the exit_code would be of tee, not reflecting main command failures # feed the command to bash because on Ubuntu it was executed in sh (dash) where pipefail is not supported # clean up for input_filename in input_filenames: os.remove(input_filename) # delete the larger (temporary) input files for tmp_path in tmp_dirs_to_clean_up: rmdir_if_exists_and_empty(tmp_path) # safely remove the temporary directory and all its parents that did not exist before, but only if they are empty # check the run status exit_code = os.waitstatus_to_exitcode(status) # assumes we are in Unix-based OS; on Windows status is the exit code if exit_code: raise RuntimeError(f"The C++ code terminated with an error: exit code {exit_code}") print_and_log("The C++ code finished succesfully") # post-processing print_and_log(datetime.now()) print_and_log("Starting post-processing") if two_tracers: if legendre: from .post_process import post_process_legendre_multi results = post_process_legendre_multi(out_dir, n_r_bins, max_l, out_dir, shot_noise_rescaling1, shot_noise_rescaling2, skip_s_bins, skip_l, print_function = print_and_log) # multi-tracer Legendre with jackknife missing because it has not been used elif jackknife: from .post_process import post_process_jackknife_multi results = post_process_jackknife_multi(*xi_jack_names, os.path.dirname(jackknife_weights_names[0]), out_dir, n_mu_bins, out_dir, skip_s_bins, print_function = print_and_log) else: # default from .post_process import post_process_default_multi results = post_process_default_multi(out_dir, n_r_bins, n_mu_bins, out_dir, shot_noise_rescaling1, shot_noise_rescaling2, skip_s_bins, print_function = print_and_log) else: if legendre: if jackknife: from .post_process import post_process_legendre_mix_jackknife results = post_process_legendre_mix_jackknife(xi_jack_names[0], os.path.dirname(jackknife_weights_names[0]), out_dir, n_mu_bins, max_l, out_dir, skip_s_bins, skip_l, print_function = print_and_log) else: from .post_process import post_process_legendre results = post_process_legendre(out_dir, n_r_bins, max_l, out_dir, shot_noise_rescaling1, skip_s_bins, skip_l, print_function = print_and_log) elif jackknife: from .post_process import post_process_jackknife results = post_process_jackknife(xi_jack_names[0], os.path.dirname(jackknife_weights_names[0]), out_dir, n_mu_bins, out_dir, skip_s_bins, print_function = print_and_log) else: # default from .post_process import post_process_default results = post_process_default(out_dir, n_r_bins, n_mu_bins, out_dir, shot_noise_rescaling1, skip_s_bins, print_function = print_and_log) print_and_log("Finished post-processing") print_and_log(datetime.now()) print_and_log("Performing an extra convergence check") convergence_check_extra(results, print_function = print_and_log) print_and_log("Finished.") print_and_log(datetime.now()) return results