Q3D-Calibration/qdx/process.py

132 lines
4.3 KiB
Python
Raw Normal View History

import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .Bind import Bind
from .fit import fit_line
from .model import Linear1D
from .utils import readFileData, get_hist
class Process(object):
"""Process the experimental data according to the calibration results."""
def __init__(self) -> None:
pass
def __call__(self, coef, task, n=6, m=8):
"""Read Process Data
coef : str
coefficient file
task : str
task file
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
# Initialization
self.n, self.m = n, m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Calibration Data
pbar = tqdm(desc="Bind Initialization", total=n * m)
data = list(csv.reader(open(coef, "r")))
data = np.array(data, dtype=np.float64)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
bind(data[j + i * m][2:])
pbar.update(1)
pbar.close()
# Read Data
total = len(open(task, "r").readlines()) * n * m
file_list = csv.reader(open(task, "r"))
self.pX = np.array([])
self.eng = np.array([])
pbar = tqdm(desc="Read Data", total=total)
for row in file_list:
ldata, rdata = readFileData(row[0], n, m)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
x = bind.predict_px(ldata[j + i * m], rdata[j + i * m]) + float(
row[1]
)
e = bind.predict_energy(ldata[j + i * m], rdata[j + i * m])
edge_l = 5 + 130 * i + float(row[1]) - 35
edge_r = edge_l + 65
idx = np.where((x >= edge_l) & (x <= edge_r))[0]
self.pX = np.hstack((self.pX, x[idx]))
self.eng = np.hstack((self.eng, e[idx]))
pbar.update(1)
pbar.close()
def energy_filter(self, lower, upper, sigma=5.0, maxiters=5):
"""Fit px - E line and do sigma clip iteratively.
Parameters
----------
lower/upper : float
Upper and lower bounds on the initial filter
sigma: float, optional
The number of standard deviations to use for both the lower and upper clipping limit.
maxiters: int or None, optional
The maximum number of sigma-clipping iterations to perform or None to clip until convergence is achieved.
If convergence is achieved prior to maxiters iterations, the clipping iterations will stop.
"""
model = Linear1D()
idx = np.where((self.eng >= lower) & (self.eng <= upper))[0]
x, y = self.pX[idx], self.eng[idx]
for i in range(maxiters):
reg = fit_line(model, x, y)
err = np.abs(y - reg(x))
idx = np.where(err <= sigma * np.std(err))[0]
if len(idx) == len(x):
break
x, y = x[idx], y[idx]
self.pX_n = x
self.eng_n = y
self.reg = reg
def draw_result(self, path="result.png"):
"""Draw the processing result
Parameters
----------
path : str, optional
save path
"""
fig = plt.figure(figsize=(24, 12), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
2022-09-06 13:45:45 +08:00
count, center = get_hist(self.pX_n, step=0.1)
ax1.scatter(self.pX, self.eng, s=0.01, color="black")
ax1.scatter(self.pX_n, self.eng_n, s=0.01, color="orange")
ax2.step(center, count, where="post", color="k")
px_min = (np.min(self.pX_n) // 50) * 50
px_max = (np.max(self.pX_n) // 50 + 1) * 50
px_x = np.linspace(px_min, px_max, int(px_max - px_min))
ax1.plot(px_x, self.reg(px_x))
ax1.set_xticks(np.arange(px_min, px_max, 50))
ax2.set_xticks(np.arange(px_min, px_max, 50))
2022-07-27 09:36:15 +08:00
ax1.set_xlabel("x (mm)")
ax1.set_ylabel("Energy (MeV)")
ax2.set_xlabel("x (mm)")
ax2.set_ylabel("Count per bin")
fig.savefig(path, facecolor="w", transparent=False)
plt.close()