import csv import numpy as np from tqdm import tqdm from matplotlib import pyplot as plt from .Bind import Bind from .fit import fit_line from .model import Linear1D from .utils import readFileData, get_hist class Process(object): """Process the experimental data according to the calibration results. Parameters ---------- path : str coefficient file n : int, optional number of blocks, default 6 m : int, optional number of binds, default 8 """ def __init__(self, path, n=6, m=8) -> None: self.n = n self.m = m self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)] # Read Calibration Data pbar = tqdm(desc="Bind Initialization", total=n * m) data = list(csv.reader(open(path, "r"))) data = np.array(data, dtype=np.float64) for i in range(n): for j in range(m): bind = self.binds[i][j] bind(data[j + i * m][2:]) pbar.update(1) pbar.close() def read_data(self, file): """Read Process Data file : str task file path """ # Read Data n, m = self.n, self.m total = len(open(file, "r").readlines()) * self.n * self.m file_list = csv.reader(open(file, "r")) self.pX = np.array([]) self.eng = np.array([]) pbar = tqdm(desc="Read Data", total=total) for row in file_list: ldata, rdata = readFileData(row[0], n, m) for i in range(n): for j in range(m): bind = self.binds[i][j] x = bind.predict_px(ldata[j + i * m], rdata[j + i * m]) + float( row[1] ) e = bind.predict_energy(ldata[j + i * m], rdata[j + i * m]) edge_l = 5 + 130 * i + float(row[1]) - 35 edge_r = edge_l + 65 idx = np.where((x >= edge_l) & (x <= edge_r))[0] self.pX = np.hstack((self.pX, x[idx])) self.eng = np.hstack((self.eng, e[idx])) pbar.update(1) pbar.close() def energy_filter(self, lower, upper, sigma=5.0, maxiters=5): """Fit px - E line and do sigma clip iteratively. Parameters ---------- lower/upper : float Upper and lower bounds on the initial filter sigma: float, optional The number of standard deviations to use for both the lower and upper clipping limit. maxiters: int or None, optional The maximum number of sigma-clipping iterations to perform or None to clip until convergence is achieved. If convergence is achieved prior to maxiters iterations, the clipping iterations will stop. """ model = Linear1D() idx = np.where((self.eng >= lower) & (self.eng <= upper))[0] x, y = self.pX[idx], self.eng[idx] for i in range(maxiters): reg = fit_line(model, x, y) err = np.abs(y - reg(x)) idx = np.where(err <= sigma * np.std(err))[0] if len(idx) == len(x): break x, y = x[idx], y[idx] self.pX_n = x self.eng_n = y self.reg = reg def draw_filter(self, path="filter.png"): """Draw the energy filter result Parameters ---------- path : str, optional save path """ px_min = (np.min(self.pX_n) // 50) * 50 px_max = (np.max(self.pX_n) // 50 + 1) * 50 px_x = np.linspace(px_min, px_max, int(px_max - px_min)) fig = plt.figure(figsize=(20, 8), dpi=200) ax = fig.add_subplot(1, 1, 1) ax.scatter(self.pX, self.eng, s=0.01, color="black") ax.scatter(self.pX_n, self.eng_n, s=0.01, color="orange") ax.plot(px_x, self.reg(px_x)) ax.set_xlabel("x (mm)") ax.set_ylabel("Energy (MeV)") fig.savefig(path) plt.close() def draw_result(self, path="result.png"): """Draw the processing result Parameters ---------- path : str, optional save path """ fig = plt.figure(figsize=(20, 8), dpi=200) ax1 = fig.add_subplot(2, 1, 1) ax2 = fig.add_subplot(2, 1, 2) count, center = get_hist(self.pX_n, delta=0.1) ax1.scatter(self.pX_n, self.eng_n, s=0.05, color="k") ax2.step(center, count, where="post", color="k") px_min = (np.min(self.pX_n) // 50) * 50 px_max = (np.max(self.pX_n) // 50 + 1) * 50 ax1.set_xticks(np.arange(px_min, px_max, 50)) ax2.set_xticks(np.arange(px_min, px_max, 50)) ax1.set_xlabel("x (mm)") ax1.set_ylabel("Energy (MeV)") ax2.set_xlabel("x (mm)") ax2.set_ylabel("Count per bin") fig.savefig(path) plt.close()