Source code for microstructpy.verification

"""Verification

This module contains functions related to mesh verification.

"""

# --------------------------------------------------------------------------- #
#                                                                             #
# Import Modules                                                              #
#                                                                             #
# --------------------------------------------------------------------------- #

from __future__ import division
from __future__ import print_function

import copy
import os

import numpy as np
import scipy.stats
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D

import microstructpy.geometry
from microstructpy import _misc
from microstructpy import seeding

__author__ = 'Kenneth (Kip) Hart'

styles_vec = ['o', '^', 's', 'D', 'v', '*']

hist_class = scipy.stats._continuous_distns.rv_histogram
discr_rvs = scipy.stats._discrete_distns._distn_names
conti_rvs = scipy.stats._continuous_distns._distn_names

ori_deg_kws = ['orientation', 'angle', 'angle_deg']
ori_rad_kws = ['angle_rad']


# --------------------------------------------------------------------------- #
#                                                                             #
# Volume Fractions                                                            #
#                                                                             #
# --------------------------------------------------------------------------- #
[docs]def volume_fractions(poly_mesh, n_phases): """Verify volume fractions This function computes the volume fractions of each phase in the output mesh. It does so by summing the volumes of the cells in the polygonal mesh. Args: poly_mesh (PolyMesh): The polygonal/polyhedral mesh. n_phases (int): Number of phases. Returns: numpy.ndarray: Volume fractions of each phase in the poly mesh. """ region_phases = np.array(poly_mesh.phase_numbers, dtype='int') region_volumes = np.array(poly_mesh.volumes, dtype='float') vol_list = np.full(n_phases, 0, dtype='float') for i in range(n_phases): mask = np.isclose(region_phases, i) vol_list[i] = np.sum(region_volumes[mask]) v_total = np.sum(vol_list) if v_total == 0: return vol_list return vol_list / v_total
[docs]def write_volume_fractions(vol_fracs, phases, filename='volume_fractions.txt'): """Write volume fractions to a file Write the volume fractions verification out to a file. The output columns are: 1. Phase number 2. Phase name 3. Input relative volume (average, if distributed) 4. Output relative volume 5. Input volume fraction (average, if distributed) 6. Output volume fraction The first three lines of the output file are headings. Args: vol_fracs (list or numpy.ndarray): Volume fractions of the output mesh. phases (list): List of phase dictionaries. filename (str): *(optional)* Name of file to write. Defaults to ``volume_fractions.txt``. Returns: none, prints formatted volume fraction verification table to file """ # Sample input volume distributions n_inp = 5000 vol_trials = np.full((n_inp, len(phases)), -1, dtype='float') for i, phase in enumerate(phases): mask = vol_trials[:, i] < 0 while np.any(mask): new_vals = _safe_rvs(phase.get('fraction', 1), np.sum(mask)) vol_trials[mask, i] = new_vals mask = vol_trials[:, i] < 0 # Determine input and output relative volumes vols_exp = np.array([_safe_mean(p.get('fraction', 1)) for p in phases]) vols_act = np.sum(vols_exp) * np.array(vol_fracs) # Determine input volume fractions vol_total = np.sum(vol_trials, axis=1) k = 1 / vol_total frac_trials = vol_trials * k.reshape(-1, 1) vf_exp = np.mean(frac_trials, axis=0) # Create table hdr1 = ['', '', 'Input (Avg.)', 'Output', 'Input (Avg.)', 'Output'] hdr2 = ['#', 'Name', 'Relative Volume', 'Relative Volume', 'Volume Fraction', 'Volume Fraction'] rows = [hdr1, hdr2] for i, phase in enumerate(phases): name = phase.get('name', 'Material ' + str(i + 1)) row = [i, name, vols_exp[i], vols_act[i], vf_exp[i], vol_fracs[i]] rows.append(row) # Write table file_dir = os.path.dirname(filename) if not os.path.exists(file_dir): os.makedirs(file_dir) with open(filename, 'w') as file: file.write(_fixed_width_table(rows, [0, 2, 3, 4, 5]))
[docs]def plot_volume_fractions(vol_fracs, phases, filename='volume_fractions.png'): """Plot volume fraction verification This function creates a bar chart comparing the input and output volume fractions. If the input volume fraction is distributed, the top of the bar will be a curve representing the CDF of the distribution. Args: vol_fracs (list or numpy.ndarray): Output volume fractions. phases (list): List of phase dictionaries filename (str or list): *(optional)* Filename(s) to save the plot. Defaults to ``volume_fractions.png``. Returns: none, writes plot to file. """ # Sample input volume distributions n_inp = 5000 vol_trials = np.full((n_inp, len(phases)), -1, dtype='float') for i, phase in enumerate(phases): mask = vol_trials[:, i] < 0 while np.any(mask): new_vals = _safe_rvs(phase.get('fraction', 1), np.sum(mask)) vol_trials[mask, i] = new_vals mask = vol_trials[:, i] < 0 # Determine input volume fractions vol_total = np.sum(vol_trials, axis=1) k = 1 / vol_total frac_trials = vol_trials * k.reshape(-1, 1) # Initialize plot plt.clf() total_w = 0.8 bar_w = 0.5 * total_w n_plt = 201 q_plt = np.linspace(0, 1, n_plt) x_cens = np.arange(len(phases)) # Plot input volume fraction bars for i, phase in enumerate(phases): x_plt = x_cens[i] - bar_w + bar_w * q_plt y_plt = np.quantile(frac_trials[:, i], q_plt) x_plt = np.concatenate((x_plt, [x_plt[-1], x_plt[0]])) y_plt = np.concatenate((y_plt, [0, 0])) if i == 0: plt.fill(x_plt, y_plt, 'white', edgecolor='k', label='Input', zorder=3) else: plt.fill(x_plt, y_plt, 'white', edgecolor='k', zorder=3) # Plot output volume fraction bars plt.bar(x_cens + 0.5 * bar_w, vol_fracs, width=bar_w, label='Actual', color='gray', edgecolor='black', zorder=4) # Format plot xticks = range(len(phases)) xlbls = [p.get('name', 'Material ' + str(i + 1)) for i, p in enumerate(phases)] plt.xticks(xticks, xlbls) plt.gca().set_ylim(bottom=0) plt.grid(axis='y') plt.legend(loc='lower center', ncol=2, bbox_to_anchor=(0.5, -0.155), fancybox=True) plt.ylabel('Volume Fraction') plt.title('Volume Fraction Verification') if not isinstance(filename, list): filename = [filename] for fname in filename: write_dir = os.path.dirname(fname) if not os.path.exists(write_dir): os.makedirs(write_dir) plt.savefig(fname)
# --------------------------------------------------------------------------- # # # # Seeds of Best Fit # # # # --------------------------------------------------------------------------- #
[docs]def seeds_of_best_fit(seeds, phases, pmesh, tmesh): """Calculate seed geometries of best fit This function computes the seeds of best fit for the resultant polygonal and triangular meshes. It calls the the ``best_fit`` function of each seed's geometry, then copies the other seed attributes to create a new :class:`.SeedList`. The points on the faces of the grains are used to determine a fit geometry. Points on the exterior of the domain are not used since they would alter the shape of the best fit seed. Args: seeds (SeedList): List of seed geometries. phases (list): List of material phases. See :ref:`phase_dict_guide` for more information on formatting. pmesh (PolyMesh): Resultant polygonal/polyhedral mesh. tmesh (TriMesh): Resultant triangular/tetrahedral mesh. Returns: SeedList: List of seeds of best fit. .. note: If the geometry of best fit for the seed is ill-conditioned (e.g. negative axes for an ellipse), the seed geometry is set to None. """ poly_pts = np.array(pmesh.points, dtype='float') poly_neighs = np.array(pmesh.facet_neighbors, dtype='int') tri_parents = np.array(tmesh.facet_attributes, dtype='int') tri_facets = np.array(tmesh.facets, dtype='int') tri_pts = np.array(tmesh.points, dtype='float') seed_nums = np.array(pmesh.seed_numbers, dtype='int') poly_neighs[poly_neighs < 0] = -1 tri_facet_neigh_cells = poly_neighs[tri_parents] tri_facet_neigh_seeds = seed_nums[tri_facet_neigh_cells] tri_facet_neigh_seeds[tri_facet_neigh_cells < 0] = -1 poly_facet_neigh_seeds = seed_nums[poly_neighs] poly_facet_neigh_seeds[poly_neighs < 0] = -1 tri_facet_is_ext = np.min(tri_facet_neigh_seeds, axis=-1) < 0 poly_facet_is_ext = np.min(poly_facet_neigh_seeds, axis=-1) < 0 n_dim = seeds[0].geometry.n_dim fit_seeds = [] for i, seed in enumerate(seeds): p = seed.phase mat_type = phases[p].get('material_type', 'crystalline') if mat_type in _misc.kw_solid: mask = np.any(tri_facet_neigh_seeds == i, axis=-1) mask &= ~tri_facet_is_ext seed_facets = tri_facets[mask] kps = np.unique(seed_facets) if len(kps) <= n_dim: mask = np.any(tri_facet_neigh_seeds == i, axis=-1) seed_facets = tri_facets[mask] kps = np.unique(seed_facets) seed_pts = tri_pts[kps] if (mat_type not in _misc.kw_solid) or (len(kps) <= n_dim): mask = np.any(poly_facet_neigh_seeds == i, axis=-1) mask &= ~poly_facet_is_ext seed_facets = [f for f, m in zip(pmesh.facets, mask) if m] kps = np.unique([kp for f in seed_facets for kp in f]) if len(kps) <= n_dim: mask = np.any(poly_facet_neigh_seeds == i, axis=-1) seed_facets = [f for f, m in zip(pmesh.facets, mask) if m] kps = np.unique([kp for f in seed_facets for kp in f]) seed_pts = poly_pts[kps.astype('int')] try: fit_geom = seed.geometry.best_fit(seed_pts) except ValueError: fit_geom = None fit_seed = copy.deepcopy(seed) # Check for ill-conditioned seeds of best fit if _ill_conditioned(fit_geom): fit_seed.geometry = None else: fit_seed.geometry = fit_geom fit_seeds.append(fit_seed) return seeding.SeedList(fit_seeds)
def _ill_conditioned(geom): if isinstance(geom, microstructpy.geometry.Ellipse): return min(geom.axes) <= 0 return False # --------------------------------------------------------------------------- # # # # Compare to Phases # # # # --------------------------------------------------------------------------- #
[docs]def plot_distributions(seeds, phases, dirname='.', ext='png', poly_mesh=None, verif_mask=None): """Plot comparison between input and output distributions This function takes seeds and compares them against the input phases. A polygon mesh can be included for cases where grains are given an area or volume distribution, rather than size/shape/etc. This function creates both PDF and CDF plots. Args: seeds (SeedList): List of seeds to compare. phases (list): List of phase dictionaries. dirname (str): *(optional)* Plot output directory. Defaults to ``.``. ext (str or list): *(optional)* File extension(s) of the output plots. Defaults to ``'png'``. poly_mesh (PolyMesh): *(optional)* Polygonal mesh, useful for phases with an area or volume distribution. Returns: none, creates plot files. """ if verif_mask is None: verif_mask = np.full(len(seeds), True) # Get geometry of seeds comp_phases = _phase_values(seeds, phases, poly_mesh, verif_mask) # Determine all geometry keywords used kws = set([kw for phase in comp_phases for kw in phase.keys()]) kws -= set(_misc.gen_kws) # Create Plots n_dim = len(seeds[0].position) for kw in sorted(kws): if (kw in _misc.ori_kws) and (n_dim == 3): continue # PDF plot plt.clf() line_colors = [] line_labels = [] ymax = 0 for i, phase in enumerate(phases): if (kw not in comp_phases[i]) or (kw not in phase): continue # Get input PDF ymax_inp_pdf = _plot_inp_pdf(kw, i, phase) ymax = max(ymax, ymax_inp_pdf) # Get output PDF comp_phase = comp_phases[i] ymax_out_pdf, lcs, lls = _plot_out_pdf(kw, i, phase, comp_phase) ymax = max(ymax, ymax_out_pdf) line_colors.extend(lcs) line_labels.extend(lls) # Format PDF plot color_legend = plt.legend(line_colors, line_labels, loc=4) style_labels = ['Input', 'Actual'] dashed_line = Line2D([0], [0], color='k', ls=':') solid_line = Line2D([0], [0], color='k', ls='-') style_lines = [dashed_line, solid_line] style_labels = ['Input', 'Actual'] style_legend = plt.legend(style_lines, style_labels, loc=2) plt.gca().add_artist(style_legend) plt.gca().add_artist(color_legend) plt.grid(True) xlbl = ' '.join([s.capitalize() for s in kw.split('_')]) xlbl = xlbl.replace('Rad', '(radians)').replace('Deg', '(degrees)') xlbl = xlbl.replace('Orientation', 'Orientation (degrees)') plt.xlabel(xlbl) plt.ylabel('Probability Density Function') plt.ylim([0, 1.1 * ymax]) plt.title('PDF Comparison') # Save PDF plot if not isinstance(ext, list): ext = [ext] basename = kw + '_pdf.' filepath = os.path.join(dirname, basename) for extension in ext: filename = filepath + extension plt.savefig(filename) plt.close() # CDF Plot plt.clf() line_colors = [] line_labels = [] for i, phase in enumerate(phases): if (kw not in comp_phases[i]) or (kw not in phase): continue # Get input CDF _plot_inp_cdf(kw, i, phase) # Get output PDF comp_phase = comp_phases[i] lcs, lls = _plot_out_cdf(kw, i, phase, comp_phase) line_colors.extend(lcs) line_labels.extend(lls) # Format CDF plot color_legend = plt.legend(line_colors, line_labels, loc=4) style_legend = plt.legend(style_lines, style_labels, loc=2) plt.gca().add_artist(style_legend) plt.gca().add_artist(color_legend) plt.grid(True) xlbl = ' '.join([s.capitalize() for s in kw.split('_')]) xlbl = xlbl.replace('Rad', '(radians)').replace('Deg', '(degrees)') xlbl = xlbl.replace('Orientation', 'Orientation (degrees)') plt.xlabel(xlbl) plt.ylabel('Cumulative Distribution Function') plt.ylim([0, 1]) plt.title('CDF Comparison') # Save PDF plot if not isinstance(ext, list): ext = [ext] basename = kw + '_cdf.' filepath = os.path.join(dirname, basename) for extension in ext: filename = filepath + extension plt.savefig(filename) plt.close()
def _plot_inp_pdf(kw, i, phase): ymax = 0 inp_dist = phase[kw] color = phase.get('color', 'C' + str(i % 10)) if kw in ori_deg_kws and phase[kw] == 'random': x_plt = [0, 360] y_plt = [1 / 360, 1 / 360] plt.plot(x_plt, y_plt, color=color, ls=':') ymax = 1 / 360 elif kw in ori_rad_kws and phase[kw] == 'random': x_plt = [0, 2 * np.pi] y_plt = [0.5 / np.pi, 0.5 / np.pi] plt.plot(x_plt, y_plt, color=color, ls=':') ymax = y_plt[0] elif phase[kw] == 'random': e_str = 'Cannot create PDF for random setting' e_str += ' of keyword <' + str(kw) + '>' raise NotImplementedError(e_str) elif kw == 'orientation': ct = inp_dist[0][0] st = inp_dist[1][0] inp_deg = np.rad2deg(np.arctan2(st, ct)) plt.plot([inp_deg, inp_deg], [0, 1e12], color=color, ls=':') elif isinstance(inp_dist, list): for j, dist in enumerate(inp_dist): try: lb = dist.ppf(1e-3) ub = dist.ppf(1 - 1e-3) x_plt = np.linspace(lb, ub, 51) y_plt = dist.pdf(x_plt) ymax = max(ymax, np.max(y_plt)) except AttributeError: x_plt = [dist, dist] y_plt = [0, 1e12] m = styles_vec[j % len(styles_vec)] plt.plot(x_plt, y_plt, color=color, ls=':', marker=m) else: try: lb = inp_dist.ppf(1e-3) ub = inp_dist.ppf(1 - 1e-3) x_plt = np.linspace(lb, ub, 51) y_plt = inp_dist.pdf(x_plt) ymax = np.max(y_plt) except AttributeError: x_plt = [inp_dist, inp_dist] y_plt = [0, 1e12] plt.plot(x_plt, y_plt, color=color, ls=':') return ymax def _plot_inp_cdf(kw, i, phase): quants = np.linspace(0, 1, 501)[1:-1] quants_lr = np.linspace(0, 1, 21)[1:-1] inp_dist = phase[kw] color = phase.get('color', 'C' + str(i % 10)) if kw in ori_deg_kws and phase[kw] == 'random': x_plt = [0, 360] y_plt = [0, 1] plt.plot(x_plt, y_plt, color=color, ls=':') elif kw in ori_rad_kws and phase[kw] == 'random': x_plt = [0, 2 * np.pi] y_plt = [0, 1] plt.plot(x_plt, y_plt, color=color, ls=':') elif phase[kw] == 'random': e_str = 'Cannot create CDF for random setting' e_str += ' of keyword <' + str(kw) + '>' raise NotImplementedError(e_str) elif kw == 'orientation': ct = inp_dist[0][0] st = inp_dist[1][0] inp_deg = np.rad2deg(np.arctan2(st, ct)) plt.plot([inp_deg, inp_deg], [0, 1], color=color, ls=':') elif isinstance(inp_dist, list): for j, dist in enumerate(inp_dist): try: x_plt = dist.ppf(quants) y_plt = quants x_plt_lr = dist.ppf(quants_lr) y_plt_lr = quants_lr except AttributeError: x_plt = np.full(11, dist) y_plt = np.linspace(0, 1, 11) x_plt_lr = x_plt y_plt_lr = y_plt m = styles_vec[j % len(styles_vec)] plt.plot(x_plt, y_plt, color=color, ls=':') plt.plot(x_plt_lr, y_plt_lr, color=color, marker=m) else: try: x_plt = inp_dist.ppf(quants) y_plt = quants except AttributeError: x_plt = [inp_dist, inp_dist] y_plt = [0, 1] plt.plot(x_plt, y_plt, color=color, ls=':') def _plot_out_pdf(kw, i, phase, comp_phase): line_colors = [] line_labels = [] ymax = 0 inp_dist = phase[kw] comp_vals = np.array([v for v in comp_phase[kw] if v is not None]) color = phase.get('color', 'C' + str(i % 10)) name = str(phase.get('name', 'Material ' + str(i + 1))) if kw == 'orientation': comp_ct = comp_vals[:, 0, 0] comp_st = comp_vals[:, 1, 0] comp_vals = np.rad2deg(np.arctan2(comp_st, comp_ct)) ys, xbs, _ = plt.hist(comp_vals, density=True, histtype='step', color=color) ymax = max(ys) line_colors.append(Line2D([0], [0], color=color)) line_labels.append(name) elif isinstance(inp_dist, list): for j, vals in enumerate(comp_vals): ys, xbs, _ = plt.hist(vals, density=True, histtype='step', color=color) ymax = max(ymax, np.max(ys)) xs = 0.5 * (xbs[:-1] + xbs[1:]) m = styles_vec[j % len(styles_vec)] plt.plot(xs, ys, color=color, marker=m) line_colors.append(Line2D([0], [0], color=color, marker=m)) line_labels.append(name + '[' + str(j) + ']') else: ys, xbs, _ = plt.hist(comp_vals, bins=20, density=True, histtype='step', color=color) ymax = max(ymax, np.max(ys)) line_colors.append(Line2D([0], [0], color=color)) line_labels.append(name) return ymax, line_colors, line_labels def _plot_out_cdf(kw, i, phase, comp_phase): quants = np.linspace(0, 1, 501) line_colors = [] line_labels = [] inp_dist = phase[kw] comp_vals = np.array([v for v in comp_phase[kw] if v is not None]) color = phase.get('color', 'C' + str(i % 10)) name = str(phase.get('name', 'Material ' + str(i + 1))) if kw == 'orientation': comp_ct = comp_vals[:, 0, 0] comp_st = comp_vals[:, 1, 0] comp_vals = np.rad2deg(np.arctan2(comp_st, comp_ct)) x_plt = np.quantile(comp_vals, quants) y_plt = quants plt.plot(x_plt, y_plt, color=color) line_colors.append(Line2D([0], [0], color=color)) line_labels.append(name) elif isinstance(inp_dist, list): for j, vals in enumerate(comp_vals): x_plt = np.quantile(vals, quants) y_plt = quants m = styles_vec[j % len(styles_vec)] plt.plot(x_plt, y_plt, color=color, marker=m) line_colors.append(Line2D([0], [0], color=color, marker=m)) line_labels.append(name + '[' + str(j) + ']') else: x_plt = np.quantile(comp_vals, quants) y_plt = quants plt.plot(x_plt, y_plt, color=color) line_colors.append(Line2D([0], [0], color=color)) line_labels.append(name) return line_colors, line_labels # --------------------------------------------------------------------------- # # # # Maximum Likelihood Estimators for Phases # # # # --------------------------------------------------------------------------- #
[docs]def mle_phases(seeds, phases, poly_mesh=None, verif_mask=None): """Get maximum likelihood estimators (MLEs) for phases This function finds distributions in the list of phases and computes the MLE parameters for those distributions. The returned value is a list of phases with the same length and dictionary keywords, except the distributions are replaced with MLE distributions (based on the seeds). Constant values are replaced with the mean of the seed values. Note that the directional statistics are not used - so the results for orientation angles and matrices are unreliable. Also note that SciPy currently does not support MLEs for discrete random variables. Any discrete distributions will be given a histogram output. .. note:: Directional statistics are not used and as such the results for orientation angles and matrices are unreliable. The only exception is normally distributed orientation angles. Args: seeds (SeedList): List of seeds. phases (list): List of input phase dictionaries. poly_mesh (PolyMesh): *(optional)* Polygonal/polyhedral mesh. verif_mask (list or numpy.ndarray): *(optional)* Mask for which seeds to include in the MLE parameter calculation. Default is True for all seeds. """ circ_highs = {'angle': 360, 'angle_deg': 360, 'angle_rad': 2 * np.pi} if verif_mask is None: verif_mask = np.full(len(seeds), True) # Get geometry of seeds comp_phases = _phase_values(seeds, phases, poly_mesh, verif_mask) # Get MLEs param_phases = [] for comp_phase, phase in zip(comp_phases, phases): param_phase = {} for kw in comp_phase: comp_vals = np.array([v for v in comp_phase[kw] if v is not None]) inp_dist = phase[kw] can_circ = 'angle' in kw and hasattr(inp_dist, 'dist') if can_circ: can_circ &= inp_dist.dist.name == 'norm' if can_circ: high = circ_highs[kw] circ_mean = scipy.stats.circmean(comp_vals, high=high) circ_scl = scipy.stats.circstd(comp_vals, high=high) dist = scipy.stats.norm(loc=circ_mean, scale=circ_scl) else: dist = _mle_dist(comp_vals, inp_dist) param_phase[kw] = dist param_phases.append(param_phase) return param_phases
[docs]def write_mle_phases(inp_phases, out_phases, filename='mles.txt'): """Write MLE parameters in a table This function writes out a text file containing the input parameters and maximum likelihood estimators (MLEs) for the outputs. Args: inp_phases (list): List of input phase dictionaries. out_phases (list): List of output phase dictionaries. filename (str): *(optional)* Filename of the output table. Defaults to ``mles.txt``. Returns: none, writes file. """ # Create table rows as dictionaries rows_dict = [] assert len(inp_phases) == len(out_phases) for i in range(len(inp_phases)): inp_phase = inp_phases[i] out_phase = out_phases[i] name = inp_phase.get('name', 'Material ' + str(i + 1)) kws = set(out_phase.keys()) - set(_misc.gen_kws) for kw in sorted(kws): inp_dist = inp_phase[kw] out_dist = out_phase[kw] if kw == 'orientation': row_dict = {'i': i, 'name': name, 'kw': kw} rows_dict.append(row_dict) continue if isinstance(inp_dist, list): for j in range(len(inp_dist)): row_dict = {'i': i, 'name': name, 'kw': kw + '[' + j + ']'} inp_dict = _dist_dict(inp_dist[j]) out_dict = _dist_dict(out_dist[j]) for key in inp_dict: row_dict[key + '_inp'] = inp_dict[key] row_dict[key + '_out'] = out_dict[key] rows_dict.append(row_dict) else: row_dict = {'i': i, 'name': name, 'kw': kw} inp_dict = _dist_dict(inp_dist) out_dict = _dist_dict(out_dist) for key in inp_dict: row_dict[key + '_inp'] = inp_dict[key] row_dict[key + '_out'] = out_dict[key] rows_dict.append(row_dict) # Determine headings / order of dictionaries hdg_kws = set([kw for row in rows_dict for kw in row.keys()]) init_kws = ['i', 'name', 'kw'] loc_scale_kws = ['loc_inp', 'loc_out', 'scale_inp', 'scale_out'] all_kws = [kw for kw in init_kws] for kw in sorted(list(hdg_kws)): if kw in init_kws or kw in loc_scale_kws: continue all_kws.append(kw) for kw in loc_scale_kws: if kw in hdg_kws: all_kws.append(kw) # create list rows rows_list = [[d.get(kw, '') for kw in all_kws] for d in rows_dict] # create headers hdr = _mle_hdr(all_kws) table = hdr + rows_list rjust = [i for i, kw in enumerate(all_kws) if kw not in ['name', 'kw']] # Save table out_dir = os.path.dirname(filename) if out_dir and not os.path.exists(out_dir): os.makedirs(out_dir) with open(filename, 'w') as file: file.write(_fixed_width_table(table, rjust))
def _mle_hdr(all_kws): hdr1 = [] hdr2 = [] for kw in all_kws: if kw == 'i': h1 = '' h2 = '#' elif kw == 'name': h1 = '' h2 = 'Name' elif kw == 'kw': h1 = '' h2 = 'Parameter' elif kw.endswith('_inp'): h1 = 'Input' h2 = kw.rstrip('_inp') elif kw.endswith('_out'): h1 = 'Output' h2 = kw.rstrip('_out') else: raise ValueError('Cannot creating heading for keyword ' + str(kw)) hdr1.append(h1) hdr2.append(h2) return [hdr1, hdr2] # --------------------------------------------------------------------------- # # # # Phase Error Statistics # # # # --------------------------------------------------------------------------- #
[docs]def error_stats(fit_seeds, seeds, phases, poly_mesh=None, verif_mask=None): """Error statistics for seeds This function creates a dictionary of error statistics for each of the input distributions in the phases. Args: fit_seeds (SeedList): List of seeds of best fit. seeds (SeedList): List of seeds. phases (list): List of input phase dictionaries. poly_mesh (PolyMesh): *(optional)* Polygonal/polyhedral mesh. verif_mask (list or numpy.ndarray): *(optional)* Mask for seeds to be included in the analysis. Defaults to all True. Returns: list: List with the same size and dictionary keywords as phases, but with error statistics dictionaries in each entry. """ if verif_mask is None: verif_mask = np.full(len(seeds), True) # Organize the geometry values init_phases = _phase_values(seeds, phases, verif_mask=verif_mask) outp_phases = _phase_values(fit_seeds, phases, poly_mesh, verif_mask) err_phases = [] for i in range(len(phases)): i_phase = init_phases[i] o_phase = outp_phases[i] phase = phases[i] for kw in phase: if kw in ('angle', 'angle_deg') and phase[kw] == 'random': phase[kw] = scipy.stats.uniform(loc=0, scale=360) if kw == 'angle_rad': phase[kw] = scipy.stats.uniform(loc=0, scale=2 * np.pi) err_io = {kw: _kw_errs(i_phase[kw], o_phase[kw]) for kw in i_phase} err_po = {kw: _kw_stats(phase[kw], o_phase[kw]) for kw in o_phase} err_phase = {} for kw in i_phase: if kw == 'orientation': err_phase[kw] = {} continue val = err_io[kw].copy() val.update(err_po[kw]) err_phase[kw] = val err_phases.append(err_phase) return err_phases
def _kw_errs(y_exp, y_act): if np.array(y_exp).ndim > 1: return [_kw_errs(*tup) for tup in zip(y_exp, y_act)] errs = {} mask = np.array([y_a is not None for y_a in y_act]) if not np.any(mask): return errs y_expect = np.array(y_exp)[mask] y_actual = np.array([y_a for y_a in y_act if y_a is not None]) r = y_actual - y_expect # MAE mae = np.mean(np.abs(r)) errs['mae'] = mae # MSE mse = np.mean(r * r) errs['mse'] = mse # RMSE rmse = np.sqrt(mse) errs['rmse'] = rmse # R^2 coeff_det = _r2(y_actual, y_expect) errs['R^2'] = coeff_det # Max Error inf_norm = np.max(np.abs(r)) errs['inf_norm'] = inf_norm return errs def _r2(y_act, y_exp): r = y_act - y_exp mse = np.mean(r * r) y_bar = np.mean(y_act) r_ybar = y_act - y_bar mse_baseline = np.mean(r_ybar * r_ybar) coeff_det = 1 - (mse / mse_baseline) return coeff_det def _kw_stats(dist_exp, y_act): if isinstance(dist_exp, list): return [_kw_stats(*tup) for tup in zip(dist_exp, y_act)] stats = {} y_actual = np.array([y_a for y_a in y_act if y_a is not None]) if len(y_actual) == 0: return stats y_pred = _safe_rvs(dist_exp, 5000) # Wasserstein Distance wass = scipy.stats.wasserstein_distance(y_actual, y_pred) stats['wasserstein_distance'] = wass # Energy Distance e_dist = scipy.stats.energy_distance(y_actual, y_pred) stats['energy_distance'] = e_dist # K-S Test if hasattr(dist_exp, 'cdf'): cdf_func = dist_exp.cdf else: def cdf_func(x): return (x > dist_exp).astype('float') ks_stat, ks_p = scipy.stats.kstest(y_actual, cdf_func) stats['ks_statistic'] = ks_stat stats['ks_p_value'] = ks_p # T-test t_stat, t_p = scipy.stats.ttest_ind(y_actual, y_pred) stats['t_stat'] = t_stat stats['t_p_value'] = t_p return stats
[docs]def write_error_stats(errs, phases, filename='error_stats.txt'): """Write error statistics to file This function takes previously computed error statistics and writes them to a human-readable text file. Args: errs (list): List of error statistics for each input phase parameter. Organized the same as ``phases``. phases (list): List of input phases. See :ref:`phase_dict_guide` for more details. filename (str): *(optional)* The name of the file to contain the error statistics. Defaults to ``error_stats.txt``. """ # Create table rows as dictionaries rows_dict = [] assert len(errs) == len(phases) for i in range(len(errs)): err_dict = errs[i] phase = phases[i] name = phase.get('name', 'Material ' + str(i + 1)) kws = set(err_dict.keys()) - set(_misc.gen_kws) for kw in kws: err_metrics = err_dict[kw] inp_dist = phase[kw] if isinstance(inp_dist, list): for j in range(len(inp_dist)): row_dict = {'i': i, 'name': name, 'kw': kw + '[' + j + ']'} row_dict.update(err_metrics[j]) rows_dict.append(row_dict) else: row_dict = {'i': i, 'name': name, 'kw': kw} row_dict.update(err_metrics) rows_dict.append(row_dict) # Determine headings / order of dictionaries hdg_kws = set([kw for row in rows_dict for kw in row.keys()]) init_kws = ['i', 'name', 'kw'] all_kws = [kw for kw in init_kws] for kw in sorted(list(hdg_kws)): if kw not in init_kws: all_kws.append(kw) # create list rows rows_list = [[d.get(kw, '') for kw in all_kws] for d in rows_dict] # create headers hdr1 = [] hdr2 = [] hd1 = {'i': '', 'name': '', 'kw': '', 'energy_distance': 'Energy', 'inf_norm': 'Maximum', 'ks_p_value': 'K-S', 'ks_statistic': 'K-S', 'mae': 'Mean Average', 'mse': 'Mean Squared', 'rmse': 'Root Mean', 't_p_value': 't-test', 't_stat': 't-test', 'wasserstein_distance': 'Wasserstein'} hd2 = {'i': '#', 'name': 'Name', 'kw': 'Parameter', 'energy_distance': 'Distance', 'inf_norm': 'Absolute Error', 'ks_p_value': 'p-value', 'ks_statistic': 'Statistic', 'mae': 'Error', 'mse': 'Error', 'rmse': 'Squared Error', 't_p_value': 'p-value', 't_stat': 'Statistic', 'wasserstein_distance': 'Distance'} for kw in all_kws: hdr1.append(hd1.get(kw, '')) hdr2.append(hd2.get(kw, kw)) table = [hdr1, hdr2] + rows_list rjust = [i for i, kw in enumerate(all_kws) if kw not in ['name', 'kw']] # Save table out_dir = os.path.dirname(filename) if out_dir and not os.path.exists(out_dir): os.makedirs(out_dir) with open(filename, 'w') as file: file.write(_fixed_width_table(table, rjust))
# --------------------------------------------------------------------------- # # # # Private Functions # # # # --------------------------------------------------------------------------- # def _safe_mean(x): try: mu = x.mean() except AttributeError: mu = x return mu def _safe_rvs(x, size=1): if isinstance(x, list): return np.array([_safe_rvs(xi, size) for xi in x]).T try: samples = x.rvs(size=size) except AttributeError: samples = np.full(size, x) return samples def _phase_values(seeds, phases, poly_mesh=None, verif_mask=None): """Takes the properties of the seeds and organizes them like the phases """ if verif_mask is None: verif_mask = np.full(len(seeds), True) if poly_mesh is not None: cell_vols = np.array(poly_mesh.volumes) vols = [] for i in range(len(seeds)): mask = np.isclose(poly_mesh.seed_numbers, i) vols.append(np.sum(cell_vols[mask])) comp_phases = [] for i, phase in enumerate(phases): comp_phase = {} phase_seeds = [s for f, s in zip(verif_mask, seeds) if s.phase == i and f] for kw in phase: if 'rot_seq' in kw: continue if kw in _misc.gen_kws: continue try: if kw in ('area', 'volume') and (poly_mesh is not None): vals = [v for v, s, f in zip(vols, seeds, verif_mask) if s.phase == i and f] else: vals = [_getattr(s.geometry, kw) for s in phase_seeds] comp_phase[kw] = vals except AttributeError: pass comp_phases.append(comp_phase) return comp_phases def _getattr(inst, kw): try: a = getattr(inst, kw) except AttributeError: a = None return a def _mle_dist(values, dist): if isinstance(dist, list): return [_mle_dist(*tup) for tup in zip(values, dist)] if not (hasattr(dist, 'dist') or isinstance(dist, hist_class)): return np.mean(values) if isinstance(dist, hist_class) or dist.dist.name in discr_rvs: hist = np.histogram(values, bins='auto', density=True) return scipy.stats.rv_histogram(hist) args = dist.dist.fit(values) kwargs = {'loc': args[-2], 'scale': args[-1]} name = dist.dist.name return scipy.stats.__dict__[name](*args[:-2], **kwargs) def _dist_dict(dist): if not (hasattr(dist, 'dist') or isinstance(dist, hist_class)): return {'loc': dist} if isinstance(dist, hist_class) or dist.dist.name in discr_rvs: return {} args = dist.args kwds = dist.kwds shape_vals, loc, scale = dist.dist._parse_args(*args, **kwds) param_dict = {'loc': loc, 'scale': scale} if len(shape_vals) == 0: return param_dict shape_kwds = dist.dist.shapes.split(',') for val, kwd in zip(shape_vals, shape_kwds): param_dict[kwd.strip().lower()] = val return param_dict def _fixed_width_table(rows, rjust_cols=[]): # heading row hr = None for i, row in enumerate(rows): if all([isinstance(col, str) for col in row]): hr = i # Convert rows to strings str_rows = [] for row in rows: str_row = [] for col in row: if isinstance(col, float): if col == 0 or np.abs(np.log10(np.abs(col))) < 3: col_str = '{:f}'.format(col) else: col_str = '{:e}'.format(col) else: col_str = str(col) str_row.append(col_str) str_rows.append(str_row) # Determine column widths col_widths = [max([len(s) for s in col]) for col in zip(*str_rows)] # Add divider if hr is not None: str_rows.insert(hr + 1, [w * '-' for w in col_widths]) # Create table table = '' for i, row in enumerate(str_rows): row_strings = [] for j, col in enumerate(row): w = col_widths[j] if j in rjust_cols and i > hr + 1: row_strings.append(col.rjust(w)) else: row_strings.append(col.ljust(w)) table += ' '.join(row_strings) + '\n' return table