"""
- The script is meant to calculate material properties for a material
by performing inverse analysis on experimental results using the
constitutive model with phase field fracture here implemented in
FEniCS
- This script contains only the inverse analysis procedure with
a simple steepest descent algo while the FEM simulation and the
related settings are implemented in other scripts (see WST.py)
- The objective function is embedded in a class that creates it
taking as input the FEM model file, the material data and the
labels of the parameters to identify
"""
#%% ---------------------------------------------------------------70
# Import necessary packages
# -----------------------------------------------------------------70
import os
import sys
import time
from datetime import datetime
import numpy as np
#%% ---------------------------------------------------------------70
# Control parameters
# -----------------------------------------------------------------70
material_data = "lab_test_240416T1520"
experimental_data_file = material_data + "_Fd_data.csv"
FEM_file = "WST.py"
parameters_labels = ["Emod"]
# initial guess consistent with the choice for the sim parameters
initial_guess = [1.3e9]
# setup limits for the parameters space
limits = [
[0.8e9, 50e9]
]
# perturbation for the calculation of the gradient of objective function
perturbation = [0.01e9]
# scale of the parameters
scale_factor = [1e15]
# columns identifiers for the simulation output file
interested_cols_identifiers = ["open mm", "HRF kN"]
# initialize step length
alpha = 0.1
# allow the step length multiplier to change
explore_alpha_variation_flag = 1
# set the increasing-decreasing alpha factor
exploration_factors_list = [1, 2, 0.5]
# tolerance for the minimization of the objective function
tolerance = 0.1
# iteration limit for the minimization algo
max_sims_counter = 50
# allow objective function to increase for a certain number of times
max_objective_function_increase_counter = 10
# maximum number of inverse analysis iterations
max_iterations = 100
# specify if you want to timestamp the output or not
timestamp_flag = 1
#%% ---------------------------------------------------------------70
# Filesystem settings
# -----------------------------------------------------------------70
folder_descriptor = "Inverse_Analysis_" + material_data
now = datetime.now()
dt_string = now.strftime("%Y%m%dT%H%M")[2:]
if timestamp_flag:
outfolder = dt_string + "_" + folder_descriptor
else:
outfolder = folder_descriptor
# create output folder
os.system("mkdir " + outfolder)
# copy this file to the inverse analysis folder for forensics
os.system("cp " + __file__ + " ./" + outfolder + "/" +
__file__.split("/")[-1].replace(".py", "") +
"_" + dt_string + ".py" )
# copy the sim file to the inverse analysis folder for forensics
os.system("cp " + FEM_file + " ./" + outfolder + "/" +
FEM_file.split("/")[-1].replace(".py", "") +
"_" + dt_string + ".py" )
# copy the material data file to the inverse analysis folder for forensics
os.system("cp " + material_data + ".py" +
" ./" + outfolder + "/" + material_data + "_" + dt_string + ".py")
# inverse analysis output file
outfile = outfolder + "/" + "README.md"
# prompt for forensic
readme = input("Describe why to run this inverse analysis procedure: \n")
with open(outfile, "w") as file:
file.writelines(readme)
#%% ---------------------------------------------------------------70
# Objective function definition
# -----------------------------------------------------------------70
def fileprint(string):
print(string)
with open(outfile, "a") as file:
file.writelines( "\n" + string)
def retrieve_info(file, interested_cols_identifiers):
# take out header string
with open(file, "r") as file_opener:
head = [next(file_opener) for _ in range(3)]
header_string = head[1]
# identify columns corresponding to interested quantities
interested_cols = {}
character_counter = 0
word_counter = -1
close = -1
for character in header_string:
character_counter += 1
if character == "[":
init = character_counter
elif character == "]":
close = character_counter
identifier = header_string[init:close-1]
word_counter += 1
if identifier in interested_cols_identifiers:
interested_cols[identifier] = word_counter
computed_data = {}
for identifier in interested_cols_identifiers:
computed_data[identifier] = np.genfromtxt(
file,
skip_header = 2,
skip_footer = 2,
usecols = interested_cols[identifier]
)
return computed_data
class objective_function_creator():
def __init__(self, parameters_labels, FEM_file, experimental_data, material_data, interested_cols_identifiers, outfolder):
self.parameters_labels = parameters_labels
self.FEM_file = FEM_file
self.experimental_data = experimental_data
self.material_data = material_data
self.sims_counter = 0
self.interested_cols_identifiers = interested_cols_identifiers
self.outfolder = outfolder
self.nparams = len(parameters_labels)
def evaluate(self, params):
"""inverse analysis objective function
Inputs:
- the parameters to optimize for
- FEM simulation file
- experimental data file
Output: a single scalar, sqrt of sum of the differences between
experimental and calculated at each sim time step
"""
# pack the parameters in an identification string
sim_ID = ""
for i in range(self.nparams):
sim_ID += self.parameters_labels[i] + "_" + str(params[i]) + "_"
# check if simulation was already run
sim_already_run_flag = 0
for file in os.listdir(self.outfolder):
if file == sim_ID + ".md":
sim_already_run_flag = 1
# run sim if necessary
if sim_already_run_flag == 0:
# printout a message to monitor inverse analysis procedure
print(f"running sim: " + sim_ID)
# launch the FEM with the input parameters
os.system("python " + self.FEM_file + " " + self.material_data + " " + sim_ID + " " + self.outfolder)
# update simulations counter
self.sims_counter += 1
# check completion
completed_flag = 0
while completed_flag == 0:
for file in os.listdir("./" + outfolder + "/"):
if file == "simulation_completed.md":
completed_flag = 1
# delete the flag file
os.system("rm " + self.outfolder + "/" + "simulation_completed.md")
# read result file
computed_data = retrieve_info(self.outfolder + "/" + sim_ID + ".md", self.interested_cols_identifiers)
# interpolate experimental data at simulation time-steps
interpolated_experimental_data = np.interp(computed_data["open mm"], self.experimental_data[:,0], self.experimental_data[:,1])*1e-3
# be careful with the consistency of the units of measure, the factor 1e3 serves this purpose
obj_func_val = 1e3*np.sqrt(np.sum(np.square(computed_data["HRF kN"] - interpolated_experimental_data)))
return obj_func_val
def update_params(paramaters_array, increment, limits):
# update parameters within limits
# careful with the += operator, it updates the input too !!!
paramaters_array = paramaters_array + increment
for ii in range(len(params)):
paramaters_array[ii] = max(limits[ii][0], paramaters_array[ii])
paramaters_array[ii] = min(limits[ii][1], paramaters_array[ii])
return paramaters_array
def forensic_printout(sims_counter, alpha, params, obj_func_val, gradient, update):
# printout CPU time
info_string = f"{int((time.time() - tic)/60):^5}\t"
# printout number of simulations run
info_string += f"{sims_counter:^5}\t"
# printout alpha value
info_string += f"{alpha:^10.5g}\t"
# printout parameters
for param in params:
info_string += f"{param:^10.5g}\t"
# printout objective function value
info_string += f"{obj_func_val:^10.5g}\t"
# printout gradient components
for ii in range(nparams):
try:
info_string += f"{gradient[ii]:^10.5g}\t"
except:
info_string += f"{gradient[ii]:^10}\t"
# printout placeholder for the update flag
info_string += f"{update:^10}\t"
fileprint( info_string )
#%% ---------------------------------------------------------------70
# Messages to the user
# -----------------------------------------------------------------70
# number of parameters
nparams = len(parameters_labels)
# README file heading
info_string = f"{'[min]':^5}\t" +\
f"{'[sim N]':^5}\t" +\
f"{'[alpha]':^10}\t"
for parameter in parameters_labels:
parameter = '[' + parameter + ']'
info_string += f"{parameter:^10}\t"
info_string += f"{'[obj_val]':^10}\t"
for ii in range(nparams):
info_string += f"{'[grad ' + str(ii) + ']':^10}\t"
info_string += f"{'[update]':^10}\t"
fileprint( info_string )
# create dummy grad list for forensic printout
dummy_grad = []
for ii in range(nparams):
dummy_grad.append('-')
#%% ---------------------------------------------------------------70
# Objective function optimization
# -----------------------------------------------------------------70
# read experimental data file
experimental_data = np.loadtxt( experimental_data_file )
# create the objective function specifying the material data
objective_function = objective_function_creator(
parameters_labels,
FEM_file,
experimental_data,
material_data,
interested_cols_identifiers,
outfolder
)
# setup control variables for the minimization algo
params = np.array(initial_guess)
scale_factor = np.array(scale_factor)
perturbation_matrix = np.diag(perturbation)
update_counter = 0
obj_func_val_min = np.inf
objective_function_increase_counter = 0
iteration = 0
# measure elapsed time for inverse analysis procedure
tic = time.time()
# minimize objective function
while iteration <= max_iterations:
iteration += 1
if objective_function.sims_counter > max_sims_counter:
fileprint("Max number of allowed simulations reached.")
break
# compute objective function
obj_func_val = objective_function.evaluate(params)
# check if tolerance has been reached
if obj_func_val < tolerance:
# gradient evaluation is not necessary
forensic_printout(objective_function.sims_counter, alpha, params, obj_func_val, dummy_grad, update_counter)
fileprint("Tolerance reached, procedure exited successfully !!!")
break
# check that objective function is not increasing
if obj_func_val >= obj_func_val_min:
objective_function_increase_counter += 1
if objective_function_increase_counter > max_objective_function_increase_counter:
fileprint(f"Objective function has increased {objective_function_increase_counter} times, exiting inverse analysis procedure.")
break
# update historic value of objective function
obj_func_val_min = min(obj_func_val, obj_func_val_min)
# start gradient computation by initializing empty gradient list
gradient = []
for ii in range(nparams):
# compute perturbed params using perturbation
# perturbed_params = params + perturbation_matrix[:,ii]
perturbed_params = update_params(params, perturbation_matrix[:,ii], limits)
# compute objective function value at perturbed parameters
perturbed_params_obj_func_val = objective_function.evaluate(perturbed_params)
# printout for forensic at each objective function evaluation
forensic_printout(objective_function.sims_counter, alpha, perturbed_params, perturbed_params_obj_func_val, dummy_grad, '-')
# update the gradient list
gradient.append( ( perturbed_params_obj_func_val - obj_func_val )/perturbation[ii] )
# store gradient information in a numpy array
gradient = np.multiply( scale_factor, np.array(gradient) )
# normalize gradient in case it is multi-variables
# if nparams > 1:
# gradient = gradient/np.linalg.norm(gradient)
# printout for forensic after the gradient calculation
forensic_printout(objective_function.sims_counter, alpha, params, obj_func_val, gradient, update_counter)
# update the counter after the gradient calculation
update_counter += 1
# update alpha if necessary
if explore_alpha_variation_flag:
exploration = {}
exploration["sims_counter"] = []
exploration["alpha"] = []
exploration["params"] = []
exploration["obj_func_val"] = []
exploration["update"] = []
for factor in exploration_factors_list:
alpha = factor*alpha
# exploration_params = params - alpha*gradient
exploration_params = update_params(params, -alpha*gradient, limits)
exploration_obj_func_val = objective_function.evaluate(exploration_params)
# save data for identification and forensic
exploration["sims_counter"].append(objective_function.sims_counter)
exploration["alpha"].append(alpha)
exploration["params"].append(exploration_params)
exploration["obj_func_val"].append(exploration_obj_func_val)
exploration["update"].append('-')
# identify which alpha values corresponds to the min objective function
min_index = np.argmin(exploration["obj_func_val"])
# update alpha with value corresponding to min objective function
alpha = exploration["alpha"][min_index]
# flag the chosen alpha
exploration["update"][min_index] = update_counter
for ii in range(len(exploration_factors_list)):
# printout for forensic at each objective function evaluation
forensic_printout(
exploration["sims_counter"][ii],
exploration["alpha"][ii],
exploration["params"][ii],
exploration["obj_func_val"][ii],
dummy_grad,
exploration["update"][ii]
)
# update parameters with the steepest descent algo
# params += -alpha*gradient
params = update_params(params, -alpha*gradient, limits)
if iteration == max_iterations:
fileprint(f"Max iterations reached")
fileprint(f"Elapsed CPU time: {int((time.time() - tic)/60):5d} [min]")