change: cal and pro to class;add: energy filter

This commit is contained in:
liuyihui 2022-07-27 00:05:06 +08:00
parent 543d5e106b
commit ad45620c0f
4 changed files with 311 additions and 169 deletions

View File

@ -1,100 +0,0 @@
import csv
import numpy as np
from qdx import Bind
from qdx.utils import readBlockData, get_hist
from tqdm import tqdm
from matplotlib import pyplot as plt
# Initialization
n, m = 5, 8
bias = 12.97
deltaE = 4
binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Data
file_list = csv.reader(open("./config1.csv", "r"))
pbar = tqdm(desc="Read Data E1", total=len(open("./config1.csv", "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], pn, m)
for i in range(m):
bind = binds[pn][i]
bind.add_data(0, ldata[i], rdata[i], 585 - float(row[2]))
pbar.update(1)
pbar.close()
file_list = csv.reader(open("./config2.csv", "r"))
pbar = tqdm(desc="Read Data E2", total=len(open("./config2.csv", "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], pn, m)
for i in range(m):
bind = binds[pn][i]
bind.add_data(1, ldata[i], rdata[i], 585 - float(row[2]))
pbar.update(1)
pbar.close()
# Data preprocessing
pbar = tqdm(desc="Bind Process", total=n * m)
for i in range(n):
for j in range(m):
bind: Bind = binds[i][j]
bind.slash()
bind.get_line()
bind.get_kb(bias, deltaE)
bind.draw_fit_line("result/FIT-LINE/" + bind.name + ".png")
bind.draw_cluster("result/GMM/" + bind.name + ".png")
bind.draw_peak("result/PEAK/" + bind.name + ".png")
pbar.update(1)
pbar.close()
# Fit
pbar = tqdm(desc="Bind Fit", total=n * m)
for i in range(n):
for j in range(m):
bind: Bind = binds[i][j]
bind.get_peak_center()
bind.fit_px()
pbar.update(1)
pbar.close()
# Draw check figure
pbar = tqdm(desc="Figure Check", total=n * m)
fig = plt.figure(figsize=(16, 10), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
peaks = np.array([])
for i in range(n):
for j in range(m):
bind = binds[i][j]
peaks = np.hstack((np.unique(bind.px[0]), peaks))
eng = bind.predict_energy(bind.x[0], bind.y[0])
pX = bind.predict_px(bind.x[0], bind.y[0])
count, center = get_hist(pX, delta=0.5)
ax1.scatter(pX, eng, s=0.1, color="k")
ax2.scatter(center, count + 2500 * (7 - j), s=0.5, color="k")
pbar.update(1)
peaks = np.unique(peaks)
for x in peaks:
ax2.vlines(x, 0, 20000, color="gray", linestyles="dashed")
for j in range(m):
ax2.hlines(2500 * j, -50, 600, color="r", linestyles="dashdot")
fig.savefig("./result/Check.png", facecolor="w", transparent=False)
plt.close()
pbar.close()
# Save coefficient to file
f = open("./coef.csv", "w")
for i in range(n):
for j in range(m):
bind = binds[i][j]
f.writelines(
"{:d},{:d},{:.9f},{:.9f},{:.9f},{:.9f},{:.9f}\n".format(
i, j, bind.k1, bind.k2, bind.b, bind.L, bind.C
)
)

View File

@ -1,69 +0,0 @@
import csv
import numpy as np
from qdx import Bind
from qdx.utils import readFileData, get_hist
from tqdm import tqdm
from matplotlib import pyplot as plt
# Initialization
n, m = 5, 8
binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Calibration Data
pbar = tqdm(desc="Bind Initialization", total=n * m)
data = list(csv.reader(open("coef1.csv", "r")))
data = np.array(data, dtype=np.float64)
for i in range(n):
for j in range(m):
bind = binds[i][j]
bind(data[j + i * m][2:])
pbar.update(1)
pbar.close()
# Read Data
total = len(open("task3.csv", "r").readlines()) * n * m
file_list = csv.reader(open("task3.csv", "r"))
pX = np.array([])
eng = np.array([])
pX_full = np.array([])
eng_full = np.array([])
pbar = tqdm(desc="Task - Mg25(d,p)Mg26*", total=total)
for row in file_list:
ldata, rdata = readFileData(row[0], n, m)
for i in range(n):
for j in range(m):
bind = binds[i][j]
x = bind.predict_px(ldata[j + i * m], rdata[j + i * m]) + float(row[1])
e = bind.predict_energy(ldata[j + i * m], rdata[j + i * m])
edge_l = 5 + 130 * i + float(row[1]) - 35
edge_r = edge_l + 65
idx = np.where((x >= edge_l) & (x <= edge_r))[0]
pX = np.hstack((pX, x[idx]))
eng = np.hstack((eng, e[idx]))
pX_full = np.hstack((pX_full, x))
eng_full = np.hstack((eng_full, e))
pbar.update(1)
pbar.close()
# Draw
fig = plt.figure(figsize=(20, 8), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
count, center = get_hist(pX, delta=0.1)
ax1.scatter(pX, eng, s=0.05, color="k")
py, = ax2.step(center, count, where="post", color="k")
ax1.set_xticks(np.arange((np.min(pX) // 50) * 50, (np.max(pX) // 50 + 1) * 50, 50))
ax2.set_xticks(np.arange((np.min(pX) // 50) * 50, (np.max(pX) // 50 + 1) * 50, 50))
fig.savefig("Task3.png")
plt.close()

160
qdx/calibration.py Normal file
View File

@ -0,0 +1,160 @@
import os
import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .Bind import Bind
from .utils import readBlockData, get_hist
class Calibration(object):
"""Calibrate the detector according to the calibration data
Parameters
----------
bias_b : float
bias $b = b_1 + b_2$
delta_e : float
delta energy between two beams
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
def __init__(self, bias_b, delta_e, n=6, m=8):
self.bias = bias_b
self.delta_e = delta_e
self.n = n
self.m = m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
def __call__(self, file1, file2):
"""Calibration
Parameters
----------
file1/2 : str
data file path of energy 1/2
"""
# Read Data
file_list = csv.reader(open(file1, "r"))
pbar = tqdm(desc="Read Data E1", total=len(open(file1, "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], pn, self.m)
for i in range(self.m):
bind = self.binds[pn][i]
bind.add_data(0, ldata[i], rdata[i], float(row[2]))
pbar.update(1)
pbar.close()
file_list = csv.reader(open(file2, "r"))
pbar = tqdm(desc="Read Data E1", total=len(open(file2, "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], pn, self.m)
for i in range(self.m):
bind = self.binds[pn][i]
bind.add_data(1, ldata[i], rdata[i], float(row[2]))
pbar.update(1)
pbar.close()
# Data preprocessing
pbar = tqdm(desc="Bind Process", total=self.n * self.m)
for i in range(self.n):
for j in range(self.m):
bind: Bind = self.binds[i][j]
bind.slash()
bind.get_line()
bind.get_kb(self.bias, self.delta_e)
pbar.update(1)
pbar.close()
# Fit
pbar = tqdm(desc="Bind Fit", total=self.n * self.m)
for i in range(self.n):
for j in range(self.m):
bind: Bind = self.binds[i][j]
bind.get_peak_center()
bind.fit_px()
pbar.update(1)
pbar.close()
def draw_fit(self, path):
"""Draw fit result
Parameters
----------
path : str
save folder, there must be `Fit-line`, `GMM`, and `PEAK` 3 subfolders.
"""
for i in range(self.n):
for j in range(self.m):
bind: Bind = self.binds[i][j]
bind.draw_fit_line(os.path.join(path, "FIT-LINE", bind.name + ".png"))
bind.draw_cluster(os.path.join(path, "GMM", bind.name + ".png"))
bind.draw_peak(os.path.join(path, "PEAK", bind.name + ".png"))
def draw_check(self, path="Check.png"):
"""Draw check figure
Parameters
----------
path : str, optional
save path
"""
pbar = tqdm(desc="Draw Figure Check", total=self.n * self.m)
fig = plt.figure(figsize=(24, 15), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
peaks = np.array([])
for i in range(self.n):
for j in range(self.m):
bind = self.binds[i][j]
peaks = np.hstack((np.unique(bind.px[0]), peaks))
eng = bind.predict_energy(bind.x[0], bind.y[0])
pX = bind.predict_px(bind.x[0], bind.y[0])
count, center = get_hist(pX, delta=0.5)
ax1.scatter(pX, eng, s=0.1, color="k")
ax2.scatter(center, count + 3000 * (7 - j), s=0.5, color="k")
pbar.update(1)
peaks = np.unique(peaks)
for x in peaks:
ax2.vlines(x, 0, 3000 * self.m, color="gray", linestyles="dashed")
for j in range(self.m):
ax2.hlines(
2500 * j,
(np.min(peaks) // 50) * 50,
(np.min(peaks) // 50 + 1) * 50,
color="r",
linestyles="dashdot",
)
fig.savefig(path, facecolor="w", transparent=False)
plt.close()
pbar.close()
def save(self, path="coef.csv"):
"""Save coefficient to file
Parameters
----------
path : str, optional
save path
"""
f = open(path, "w")
for i in range(self.n):
for j in range(self.m):
bind = self.binds[i][j]
f.writelines(
"{:d},{:d},{:.9f},{:.9f},{:.9f},{:.9f},{:.9f}\n".format(
i, j, bind.k1, bind.k2, bind.b, bind.L, bind.C
)
)

151
qdx/process.py Normal file
View File

@ -0,0 +1,151 @@
import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .Bind import Bind
from .fit import fit_line
from .model import Linear1D
from .utils import readFileData, get_hist
class Process(object):
"""Process the experimental data according to the calibration results.
Parameters
----------
path : str
coefficient file
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
def __init__(self, path, n=6, m=8) -> None:
self.n = n
self.m = m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Calibration Data
pbar = tqdm(desc="Bind Initialization", total=n * m)
data = list(csv.reader(open(path, "r")))
data = np.array(data, dtype=np.float64)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
bind(data[j + i * m][2:])
pbar.update(1)
pbar.close()
def read_data(self, file):
"""Read Process Data
file : str
task file path
"""
# Read Data
n, m = self.n, self.m
total = len(open(file, "r").readlines()) * self.n * self.m
file_list = csv.reader(open(file, "r"))
self.pX = np.array([])
self.eng = np.array([])
pbar = tqdm(desc="Read Data", total=total)
for row in file_list:
ldata, rdata = readFileData(row[0], n, m)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
x = bind.predict_px(ldata[j + i * m], rdata[j + i * m]) + float(
row[1]
)
e = bind.predict_energy(ldata[j + i * m], rdata[j + i * m])
edge_l = 5 + 130 * i + float(row[1]) - 35
edge_r = edge_l + 65
idx = np.where((x >= edge_l) & (x <= edge_r))[0]
self.pX = np.hstack((self.pX, x[idx]))
self.eng = np.hstack((self.eng, e[idx]))
pbar.update(1)
pbar.close()
def energy_filter(self, lower, upper, sigma=5.0, maxiters=5):
"""Fit px - E line and do sigma clip iteratively.
Parameters
----------
lower/upper : float
Upper and lower bounds on the initial filter
sigma: float, optional
The number of standard deviations to use for both the lower and upper clipping limit.
maxiters: int or None, optional
The maximum number of sigma-clipping iterations to perform or None to clip until convergence is achieved.
If convergence is achieved prior to maxiters iterations, the clipping iterations will stop.
"""
model = Linear1D()
idx = np.where((self.eng >= lower) & (self.eng <= upper))[0]
x, y = self.pX[idx], self.eng[idx]
for i in range(maxiters):
reg = fit_line(model, x, y)
err = np.abs(y - reg(x))
idx = np.where(err <= sigma * np.std(err))[0]
if len(idx) == len(x):
break
x, y = x[idx], y[idx]
self.pX_n = x
self.eng_n = y
self.reg = reg
def draw_filter(self, path="filter.png"):
"""Draw the energy filter result
Parameters
----------
path : str, optional
save path
"""
px_min = (np.min(self.pX_n) // 50) * 50
px_max = (np.max(self.pX_n) // 50 + 1) * 50
px_x = np.linspace(px_min, px_max, int(px_max - px_min))
fig = plt.figure(figsize=(20, 8), dpi=200)
ax = fig.add_subplot(1, 1, 1)
ax.scatter(self.pX, self.eng, s=0.01, color="black")
ax.scatter(self.pX_n, self.eng_n, s=0.01, color="orange")
ax.plot(px_x, self.reg(px_x))
fig.savefig(path)
plt.close()
def draw_result(self, path="result.png"):
"""Draw the processing result
Parameters
----------
path : str, optional
save path
"""
fig = plt.figure(figsize=(20, 8), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
count, center = get_hist(self.pX_n, delta=0.1)
ax1.scatter(self.pX_n, self.eng_n, s=0.05, color="k")
ax2.step(center, count, where="post", color="k")
px_min = (np.min(self.pX_n) // 50) * 50
px_max = (np.max(self.pX_n) // 50 + 1) * 50
ax1.set_xticks(np.arange(px_min, px_max, 50))
ax2.set_xticks(np.arange(px_min, px_max, 50))
fig.savefig(path)
plt.close()