Q3D-Calibration/qdx/process.py

132 lines
4.3 KiB
Python

import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .Bind import Bind
from .fit import fit_line
from .model import Linear1D
from .utils import readFileData, get_hist
class Process(object):
"""Process the experimental data according to the calibration results."""
def __init__(self) -> None:
pass
def __call__(self, coef, task, n=6, m=8):
"""Read Process Data
coef : str
coefficient file
task : str
task file
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
# Initialization
self.n, self.m = n, m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Calibration Data
pbar = tqdm(desc="Bind Initialization", total=n * m)
data = list(csv.reader(open(coef, "r")))
data = np.array(data, dtype=np.float64)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
bind(data[j + i * m][2:])
pbar.update(1)
pbar.close()
# Read Data
total = len(open(task, "r").readlines()) * n * m
file_list = csv.reader(open(task, "r"))
self.pX = np.array([])
self.eng = np.array([])
pbar = tqdm(desc="Read Data", total=total)
for row in file_list:
ldata, rdata = readFileData(row[0], n, m)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
x = bind.predict_px(ldata[j + i * m], rdata[j + i * m]) + float(
row[1]
)
e = bind.predict_energy(ldata[j + i * m], rdata[j + i * m])
edge_l = 5 + 130 * i + float(row[1]) - 35
edge_r = edge_l + 65
idx = np.where((x >= edge_l) & (x <= edge_r))[0]
self.pX = np.hstack((self.pX, x[idx]))
self.eng = np.hstack((self.eng, e[idx]))
pbar.update(1)
pbar.close()
def energy_filter(self, lower, upper, sigma=5.0, maxiters=5):
"""Fit px - E line and do sigma clip iteratively.
Parameters
----------
lower/upper : float
Upper and lower bounds on the initial filter
sigma: float, optional
The number of standard deviations to use for both the lower and upper clipping limit.
maxiters: int or None, optional
The maximum number of sigma-clipping iterations to perform or None to clip until convergence is achieved.
If convergence is achieved prior to maxiters iterations, the clipping iterations will stop.
"""
model = Linear1D()
idx = np.where((self.eng >= lower) & (self.eng <= upper))[0]
x, y = self.pX[idx], self.eng[idx]
for i in range(maxiters):
reg = fit_line(model, x, y)
err = np.abs(y - reg(x))
idx = np.where(err <= sigma * np.std(err))[0]
if len(idx) == len(x):
break
x, y = x[idx], y[idx]
self.pX_n = x
self.eng_n = y
self.reg = reg
def draw_result(self, path="result.png"):
"""Draw the processing result
Parameters
----------
path : str, optional
save path
"""
fig = plt.figure(figsize=(24, 12), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
count, center = get_hist(self.pX_n, step=0.1)
ax1.scatter(self.pX, self.eng, s=0.01, color="black")
ax1.scatter(self.pX_n, self.eng_n, s=0.01, color="orange")
ax2.step(center, count, where="post", color="k")
px_min = (np.min(self.pX_n) // 50) * 50
px_max = (np.max(self.pX_n) // 50 + 1) * 50
px_x = np.linspace(px_min, px_max, int(px_max - px_min))
ax1.plot(px_x, self.reg(px_x))
ax1.set_xticks(np.arange(px_min, px_max, 50))
ax2.set_xticks(np.arange(px_min, px_max, 50))
ax1.set_xlabel("x (mm)")
ax1.set_ylabel("Energy (MeV)")
ax2.set_xlabel("x (mm)")
ax2.set_ylabel("Count per bin")
fig.savefig(path, facecolor="w", transparent=False)
plt.close()