move to wsl

This commit is contained in:
liuyihui 2022-11-22 17:29:48 +08:00
parent 5385cef9dc
commit 893507e340
9 changed files with 905 additions and 911 deletions

46
.gitignore vendored
View File

@ -1,26 +1,20 @@
# data file
*.root
*.log
*.csv
*.json
*.png
*.txt
2016Q3D/
result/
!requirements.txt
# env
venv/
# build cache
__pycache__
# config
.vscode
.vs
# dev
*.ipynb
test*
*.code-workspace
# data file
*.root
*.log
*.csv
*.json
*.png
*.txt
data/
result/
!requirements.txt
# build cache
__pycache__
# config
.vscode
# dev
*.ipynb
test*

View File

@ -1,3 +1,3 @@
from .Bind import Bind
from .process import Process
from .calibration import Calibration
from .Bind import Bind
from .process import Process
from .calibration import Calibration

View File

@ -1,296 +1,296 @@
import numpy as np
import matplotlib.pyplot as plt
from .utils import get_hist, GMM_clip
from .model import Linear1D, FixedSlopeLine, pXLine
from .fit import fit_line, fit_hist_gaussian
class Bind(object):
"""Bind of a Block, using data under two energy conditions for fitting.
The energy of an ADC device can be expressed as: $E = kx + b$.
So the complete energy is represented by $E(x,y) = k_1x + b_1 + k_2y + b_2$.
This formula can be converted to: $y = -\\frac{k_1}{k_2}x + \\frac{E-b_1-b_2}{k_2}$.
Plotting the data in a 2D plane for linear regression, we can get $\\frac{k_1}{k_2}$ and $C_i$
Using data of $E_1$ and $E_2$, now $k_2 = \\frac{E_1 - E_2}{C_1 - C_2}$.
The incident position produces different responses in the left and right detectors, which can be expressed as:
$Px = \\frac{k_2y - k_1x}{E}L+CL$.
Attributes
----------
L/C : float
parameters in $Px = \\frac{k_2y - k_1x}{E}L+CL$.
k1, k2: float
parameters in $Px = \\frac{k_2y - k_1x}{E}L+CL$.
b : float
$b = b_1 + b_2$
K1/K2/C1/C2: float
$K_i = -\\frac{k_1}{k_2}$
$C_i = \\frac{E_i-b}{k_2}$
"""
def __init__(self, n, m):
self.n, self.m = n, m
self.px = [None, None]
self.x, self.y = [None, None], [None, None]
self.L, self.C = 40, 0
self.k1, self.k2, self.b = 0, 0, 0
self.K1, self.C1, self.K2, self.C2 = 0, 0, 0, 0
def add_data(self, k, x, y, px):
"""k-th energy data
Parameters
----------
k : int
k-th energy
x : array
left data
y : array
right data
px : int
px (set value)
"""
self.x[k] = np.concatenate((self.x[k], x)) if self.x[k] is not None else x
self.y[k] = np.concatenate((self.y[k], y)) if self.y[k] is not None else y
self.px[k] = (
np.concatenate((self.px[k], np.full(len(x), px)))
if self.px[k] is not None
else np.full(len(x), px)
)
def clip(self):
"""Using Gaussian Mixture Method (GMM) to decompose the data into noise and available data"""
data = GMM_clip(
np.array(list(zip(self.x[0], self.y[0], self.px[0])), dtype=object)
)
self.x[0] = data[:, 0]
self.y[0] = data[:, 1]
self.px[0] = data[:, 2]
data = GMM_clip(
np.array(list(zip(self.x[1], self.y[1], self.px[1])), dtype=object)
)
self.x[1] = data[:, 0]
self.y[1] = data[:, 1]
self.px[1] = data[:, 2]
def get_line(self):
"""Fit data with $y = -\\frac{k_1}{k_2}x + \\frac{E-b_1-b_2}{k_2}$."""
model = Linear1D()
reg = fit_line(model, self.x[0], self.y[0])
self.K1 = reg.slope.value
self.C1 = reg.intercept.value
reg = fit_line(model, self.x[1], self.y[1])
self.K2 = reg.slope.value
self.C2 = reg.intercept.value
def get_kb(self, bias, deltaE=4):
"""Get $k_2$, $k_2$, $b$ from $K_i$, $C_i$
Set slope equal average of $K_i$.
Parameters
----------
bias : float
bias $b = b_1 + b_2$
deltaE : float, optional
delta energy between two beams
"""
K = (self.K1 + self.K2) / 2
self.K1 = self.K2 = K
model = FixedSlopeLine(slope=K, intercept=self.C1)
fitted_model = fit_line(model, self.x[0], self.y[0])
self.C1 = fitted_model.intercept.value
model = FixedSlopeLine(slope=K, intercept=self.C2)
fitted_model = fit_line(model, self.x[1], self.y[1])
self.C2 = fitted_model.intercept.value
self.k2 = deltaE / abs(self.C1 - self.C2)
self.k1 = -self.k2 * K
eng = self.k1 * self.x[0] + self.k2 * self.y[0]
fitted_model = fit_hist_gaussian(eng, step=0.01)
self.E1 = fitted_model.mean.value
eng = self.k1 * self.x[1] + self.k2 * self.y[1]
fitted_model = fit_hist_gaussian(eng, step=0.01)
self.E2 = fitted_model.mean.value
self.b = bias - self.C1 * self.k2
self.E1 += self.b
self.E2 += self.b
def get_peak_center(self):
"""Get peak center (in channel) using Gaussian Model"""
self.fx, self.fy, self.fz = [], [], []
peaks = np.unique(self.px[0])
for px in peaks:
idx = np.where(self.px[0] == px)[0]
x, y = self.x[0][idx], self.y[0][idx]
if len(idx) < 400:
continue
fitted_model = fit_hist_gaussian(x)
self.fx.append(fitted_model.mean.value)
fitted_model = fit_hist_gaussian(y)
self.fy.append(fitted_model.mean.value)
self.fz.append(px)
self.fx = np.array(self.fx)
self.fy = np.array(self.fy)
self.fz = np.array(self.fz)
def fit_px(self):
"""Fit using $Px = \\frac{k_2y - k_1x}{E}L+CL$."""
model = Linear1D()
reg = fit_line(model, self.fx, self.fz)
E = np.mean(self.k1 * self.fx + self.k2 * self.fy + self.b)
L = reg.slope.value * E / (self.k2 * self.K1 - self.k1)
C = (reg.intercept.value - self.k2 * self.C1 * L / E) / L
model = pXLine(L=L, C=C, k1=self.k1, k2=self.k2, b=self.b)
reg = fit_line(
model, np.array(list(zip(self.fx, self.fy)), dtype=object), self.fz
)
self.L = reg.L.value
self.C = reg.C.value
def predict_energy(self, x, y):
"""Use $E(x,y) = k_1x + b_1 + k_2y + b_2$. to calculate energy.
Parameters
----------
x/y : array
data
"""
return self.k1 * x + self.k2 * y + self.b
def predict_px(self, x, y):
"""Use $Px = \\frac{k_2y - k_1x}{E}L+CL$ to calculate pX.
Parameters
----------
x/y : array
data
"""
eng = self.predict_energy(x, y)
return (self.k2 * y - self.k1 * x) / eng * self.L + self.C * self.L
def draw_fit_line(self, title):
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
ax.scatter(self.x[0], self.y[0], s=0.1, c="black", label=r"$E_1$")
ax.scatter(self.x[1], self.y[1], s=0.1, c="dimgray", label=r"$E_2$")
ax.plot(
self.x[0],
self.K1 * self.x[0] + self.C1,
c="red",
label=r"$x_2={:.4f}x_1+{:.4f},\ R^2={:.5f}$".format(
self.K1, self.C1, self.RSquare1
),
)
ax.plot(
self.x[1],
self.K2 * self.x[1] + self.C2,
c="orangered",
label=r"$x_2={:.4f}x_1+{:.4f},\ R^2={:.5f}$".format(
self.K2, self.C2, self.RSquare2
),
)
plt.title("Y - X Line")
plt.xlabel("Output - Left (X) / (channel)")
plt.ylabel("Output - Right (Y) / (channel)")
ax.legend()
fig.savefig(title, facecolor="w", transparent=False, bbox_inches='tight')
plt.close()
def draw_peak(self, title):
fig = plt.figure(figsize=(8, 8))
peaks = np.unique(self.px[0])
n, k = len(peaks), 1
for px in peaks:
ax = fig.add_subplot(n, 1, k)
ax.set_title("pX = {:.2f}".format(px))
idx = np.where(self.px[0] == px)[0]
x, y = self.x[0][idx], self.y[0][idx]
count1, center1 = get_hist(x)
count2, center2 = get_hist(y)
ax.scatter(center1, count1, s=0.5)
ax.scatter(center2, count2, s=0.5)
ax.set_xlabel("Channel")
ax.set_ylabel("Counts per Channel")
k += 1
fig.suptitle("Counts Curve")
plt.tight_layout()
fig.savefig(title, facecolor="w", transparent=False)
plt.close()
def draw_cluster(self, title):
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
peaks = np.unique(self.px[0])
for px in peaks:
idx = np.where(self.px[0] == px)[0]
x, y = self.x[0][idx], self.y[0][idx]
ax.scatter(x, y, s=0.1, label="pX={:.0f}".format(px))
plt.title("Y - X Line")
plt.xlabel("Output - Left (X) / (channel)")
plt.ylabel("Output - Right (Y) / (channel)")
plt.legend()
plt.tight_layout()
fig.savefig(title, facecolor="w", transparent=False)
plt.close()
@property
def name(self):
return "{:d}-{:d}-{:.2f}-{:.2f}".format(self.n, self.m, self.E1, self.E2)
def _r_square(self, y, yp):
mean = np.mean(y)
SST = np.sum((y - mean) ** 2)
SSE = np.sum((y - yp) ** 2)
return 1 - SSE / SST
@property
def RSquare1(self):
return self._r_square(self.y[0], self.K1 * self.x[0] + self.C1)
@property
def RSquare2(self):
return self._r_square(self.y[1], self.K2 * self.x[1] + self.C2)
def __call__(self, data):
"""Data is read to complete initialization"""
self.k1 = data[0]
self.k2 = data[1]
self.b = data[2]
self.L = data[3]
self.C = data[4]
import numpy as np
import matplotlib.pyplot as plt
from .utils import get_hist, GMM_clip
from .model import Linear1D, FixedSlopeLine, pXLine
from .fit import fit_line, fit_hist_gaussian
class Bind(object):
"""Bind of a Block, using data under two energy conditions for fitting.
The energy of an ADC device can be expressed as: $E = kx + b$.
So the complete energy is represented by $E(x,y) = k_1x + b_1 + k_2y + b_2$.
This formula can be converted to: $y = -\\frac{k_1}{k_2}x + \\frac{E-b_1-b_2}{k_2}$.
Plotting the data in a 2D plane for linear regression, we can get $\\frac{k_1}{k_2}$ and $C_i$
Using data of $E_1$ and $E_2$, now $k_2 = \\frac{E_1 - E_2}{C_1 - C_2}$.
The incident position produces different responses in the left and right detectors, which can be expressed as:
$Px = \\frac{k_2y - k_1x}{E}L+CL$.
Attributes
----------
L/C : float
parameters in $Px = \\frac{k_2y - k_1x}{E}L+CL$.
k1, k2: float
parameters in $Px = \\frac{k_2y - k_1x}{E}L+CL$.
b : float
$b = b_1 + b_2$
K1/K2/C1/C2: float
$K_i = -\\frac{k_1}{k_2}$
$C_i = \\frac{E_i-b}{k_2}$
"""
def __init__(self, n, m):
self.n, self.m = n, m
self.px = [None, None]
self.x, self.y = [None, None], [None, None]
self.L, self.C = 40, 0
self.k1, self.k2, self.b = 0, 0, 0
self.K1, self.C1, self.K2, self.C2 = 0, 0, 0, 0
def add_data(self, k, x, y, px):
"""k-th energy data
Parameters
----------
k : int
k-th energy
x : array
left data
y : array
right data
px : int
px (set value)
"""
self.x[k] = np.concatenate((self.x[k], x)) if self.x[k] is not None else x
self.y[k] = np.concatenate((self.y[k], y)) if self.y[k] is not None else y
self.px[k] = (
np.concatenate((self.px[k], np.full(len(x), px)))
if self.px[k] is not None
else np.full(len(x), px)
)
def clip(self):
"""Using Gaussian Mixture Method (GMM) to decompose the data into noise and available data"""
data = GMM_clip(
np.array(list(zip(self.x[0], self.y[0], self.px[0])), dtype=object)
)
self.x[0] = data[:, 0]
self.y[0] = data[:, 1]
self.px[0] = data[:, 2]
data = GMM_clip(
np.array(list(zip(self.x[1], self.y[1], self.px[1])), dtype=object)
)
self.x[1] = data[:, 0]
self.y[1] = data[:, 1]
self.px[1] = data[:, 2]
def get_line(self):
"""Fit data with $y = -\\frac{k_1}{k_2}x + \\frac{E-b_1-b_2}{k_2}$."""
model = Linear1D()
reg = fit_line(model, self.x[0], self.y[0])
self.K1 = reg.slope.value
self.C1 = reg.intercept.value
reg = fit_line(model, self.x[1], self.y[1])
self.K2 = reg.slope.value
self.C2 = reg.intercept.value
def get_kb(self, bias, deltaE=4):
"""Get $k_2$, $k_2$, $b$ from $K_i$, $C_i$
Set slope equal average of $K_i$.
Parameters
----------
bias : float
bias $b = b_1 + b_2$
deltaE : float, optional
delta energy between two beams
"""
K = (self.K1 + self.K2) / 2
self.K1 = self.K2 = K
model = FixedSlopeLine(slope=K, intercept=self.C1)
fitted_model = fit_line(model, self.x[0], self.y[0])
self.C1 = fitted_model.intercept.value
model = FixedSlopeLine(slope=K, intercept=self.C2)
fitted_model = fit_line(model, self.x[1], self.y[1])
self.C2 = fitted_model.intercept.value
self.k2 = deltaE / abs(self.C1 - self.C2)
self.k1 = -self.k2 * K
eng = self.k1 * self.x[0] + self.k2 * self.y[0]
fitted_model = fit_hist_gaussian(eng, step=0.01)
self.E1 = fitted_model.mean.value
eng = self.k1 * self.x[1] + self.k2 * self.y[1]
fitted_model = fit_hist_gaussian(eng, step=0.01)
self.E2 = fitted_model.mean.value
self.b = bias - self.C1 * self.k2
self.E1 += self.b
self.E2 += self.b
def get_peak_center(self):
"""Get peak center (in channel) using Gaussian Model"""
self.fx, self.fy, self.fz = [], [], []
peaks = np.unique(self.px[0])
for px in peaks:
idx = np.where(self.px[0] == px)[0]
x, y = self.x[0][idx], self.y[0][idx]
if len(idx) < 400:
continue
fitted_model = fit_hist_gaussian(x)
self.fx.append(fitted_model.mean.value)
fitted_model = fit_hist_gaussian(y)
self.fy.append(fitted_model.mean.value)
self.fz.append(px)
self.fx = np.array(self.fx)
self.fy = np.array(self.fy)
self.fz = np.array(self.fz)
def fit_px(self):
"""Fit using $Px = \\frac{k_2y - k_1x}{E}L+CL$."""
model = Linear1D()
reg = fit_line(model, self.fx, self.fz)
E = np.mean(self.k1 * self.fx + self.k2 * self.fy + self.b)
L = reg.slope.value * E / (self.k2 * self.K1 - self.k1)
C = (reg.intercept.value - self.k2 * self.C1 * L / E) / L
model = pXLine(L=L, C=C, k1=self.k1, k2=self.k2, b=self.b)
reg = fit_line(
model, np.array(list(zip(self.fx, self.fy)), dtype=object), self.fz
)
self.L = reg.L.value
self.C = reg.C.value
def predict_energy(self, x, y):
"""Use $E(x,y) = k_1x + b_1 + k_2y + b_2$. to calculate energy.
Parameters
----------
x/y : array
data
"""
return self.k1 * x + self.k2 * y + self.b
def predict_px(self, x, y):
"""Use $Px = \\frac{k_2y - k_1x}{E}L+CL$ to calculate pX.
Parameters
----------
x/y : array
data
"""
eng = self.predict_energy(x, y)
return (self.k2 * y - self.k1 * x) / eng * self.L + self.C * self.L
def draw_fit_line(self, title):
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
ax.scatter(self.x[0], self.y[0], s=0.1, c="black", label=r"$E_1$")
ax.scatter(self.x[1], self.y[1], s=0.1, c="dimgray", label=r"$E_2$")
ax.plot(
self.x[0],
self.K1 * self.x[0] + self.C1,
c="red",
label=r"$x_2={:.4f}x_1+{:.4f},\ R^2={:.5f}$".format(
self.K1, self.C1, self.RSquare1
),
)
ax.plot(
self.x[1],
self.K2 * self.x[1] + self.C2,
c="orangered",
label=r"$x_2={:.4f}x_1+{:.4f},\ R^2={:.5f}$".format(
self.K2, self.C2, self.RSquare2
),
)
plt.title("Y - X Line")
plt.xlabel("Output - Left (X) / (channel)")
plt.ylabel("Output - Right (Y) / (channel)")
ax.legend()
fig.savefig(title, facecolor="w", transparent=False, bbox_inches="tight")
plt.close()
def draw_peak(self, title):
fig = plt.figure(figsize=(8, 8))
peaks = np.unique(self.px[0])
n, k = len(peaks), 1
for px in peaks:
ax = fig.add_subplot(n, 1, k)
ax.set_title("pX = {:.2f}".format(px))
idx = np.where(self.px[0] == px)[0]
x, y = self.x[0][idx], self.y[0][idx]
count1, center1 = get_hist(x)
count2, center2 = get_hist(y)
ax.scatter(center1, count1, s=0.5)
ax.scatter(center2, count2, s=0.5)
ax.set_xlabel("Channel")
ax.set_ylabel("Counts per Channel")
k += 1
fig.suptitle("Counts Curve")
plt.tight_layout()
fig.savefig(title, facecolor="w", transparent=False)
plt.close()
def draw_cluster(self, title):
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
peaks = np.unique(self.px[0])
for px in peaks:
idx = np.where(self.px[0] == px)[0]
x, y = self.x[0][idx], self.y[0][idx]
ax.scatter(x, y, s=0.1, label="pX={:.0f}".format(px))
plt.title("Y - X Line")
plt.xlabel("Output - Left (X) / (channel)")
plt.ylabel("Output - Right (Y) / (channel)")
plt.legend()
plt.tight_layout()
fig.savefig(title, facecolor="w", transparent=False)
plt.close()
@property
def name(self):
return "{:d}-{:d}-{:.2f}-{:.2f}".format(self.n, self.m, self.E1, self.E2)
def _r_square(self, y, yp):
mean = np.mean(y)
SST = np.sum((y - mean) ** 2)
SSE = np.sum((y - yp) ** 2)
return 1 - SSE / SST
@property
def RSquare1(self):
return self._r_square(self.y[0], self.K1 * self.x[0] + self.C1)
@property
def RSquare2(self):
return self._r_square(self.y[1], self.K2 * self.x[1] + self.C2)
def __call__(self, data):
"""Data is read to complete initialization"""
self.k1 = data[0]
self.k2 = data[1]
self.b = data[2]
self.L = data[3]
self.C = data[4]

View File

@ -1,172 +1,172 @@
import os
import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .Bind import Bind
from .utils import readBlockData, get_hist
class Calibration(object):
"""Calibrate the detector according to the calibration data"""
def __init__(self):
pass
def __call__(self, file1, file2, bias_b, delta_e, n=6, m=8):
"""Calibration
Parameters
----------
file1/2 : str
data file path of energy 1/2
bias_b : float
bias $b = b_1 + b_2$, in MeV
delta_e : float
delta energy between two beams, in MeV
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
# Initialization
self.n, self.m = n, m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Data
file_list = csv.reader(open(file1, "r"))
pbar = tqdm(desc="Read Data E1", total=len(open(file1, "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], int(row[5]), pn, m)
for i in range(m):
bind = self.binds[pn][i]
bind.add_data(0, ldata[i], rdata[i], float(row[4]))
pbar.update(1)
pbar.close()
file_list = csv.reader(open(file2, "r"))
pbar = tqdm(desc="Read Data E2", total=len(open(file2, "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], int(row[5]), pn, m)
for i in range(m):
bind = self.binds[pn][i]
bind.add_data(1, ldata[i], rdata[i], float(row[4]))
pbar.update(1)
pbar.close()
# Data preprocessing
pbar = tqdm(desc="Bind Process", total=n * m)
for i in range(n):
for j in range(m):
bind: Bind = self.binds[i][j]
bind.clip()
bind.get_line()
bind.get_kb(bias_b, delta_e)
pbar.update(1)
pbar.close()
# Fit
pbar = tqdm(desc="Bind Fit", total=n * m)
for i in range(n):
for j in range(m):
bind: Bind = self.binds[i][j]
bind.get_peak_center()
bind.fit_px()
pbar.update(1)
pbar.close()
def draw_fit(self, path):
"""Draw fit result
Parameters
----------
path : str
save folder, there must be `FIT-LINE`, `GMM`, and `PEAK` 3 subfolders.
"""
pbar = tqdm(desc="Draw Fit Figure", total=self.n * self.m)
for i in range(self.n):
for j in range(self.m):
bind: Bind = self.binds[i][j]
bind.draw_fit_line(os.path.join(path, "FIT-LINE", bind.name + ".png"))
bind.draw_cluster(os.path.join(path, "GMM", bind.name + ".png"))
bind.draw_peak(os.path.join(path, "PEAK", bind.name + ".png"))
pbar.update(1)
pbar.close()
def draw_check(self, path="Check.png"):
"""Draw check figure
Parameters
----------
path : str, optional
save path
"""
pbar = tqdm(desc="Draw Check Figure", total=self.n * self.m)
fig = plt.figure(figsize=(24, 24), dpi=200)
ax1 = fig.add_subplot(3, 1, 1)
ax2 = fig.add_subplot(3, 1, 2)
ax3 = fig.add_subplot(3, 1, 3)
peaks = np.array([])
for i in range(self.n):
engs = np.array([])
for j in range(self.m):
bind = self.binds[i][j]
peaks = np.hstack((np.unique(bind.px[0]), peaks))
eng = bind.predict_energy(bind.x[0], bind.y[0])
pX = bind.predict_px(bind.x[0], bind.y[0])
count, center = get_hist(pX, step=0.5)
engs = np.hstack((engs, eng))
ax2.scatter(pX, eng, s=0.1, color="k")
ax3.scatter(center, count + 3000 * (7 - j), s=0.5, color="k")
pbar.update(1)
count, center = get_hist(engs, step=0.01)
ax1.plot(center, count)
peaks = np.unique(peaks)
for x in peaks:
ax3.vlines(x, 0, 3000 * self.m, color="gray", linestyles="dashed")
for j in range(self.m):
ax3.hlines(
3000 * j,
(np.min(peaks) // 50) * 50,
(np.max(peaks) // 50 + 1) * 50,
color="r",
linestyles="dashdot",
)
ax1.set_xlabel("Energy (MeV)")
ax1.set_ylabel("Count per bin")
ax2.set_xlabel("x (mm)")
ax2.set_ylabel("Energy (MeV)")
ax3.set_xlabel("x (mm)")
ax3.set_ylabel("Count per bin")
fig.savefig(path, facecolor="w", transparent=False)
plt.close()
pbar.close()
def save(self, path="coef.csv"):
"""Save coefficient to file
Parameters
----------
path : str, optional
save path
"""
f = open(path, "w")
for i in range(self.n):
for j in range(self.m):
bind = self.binds[i][j]
f.writelines(
"{:d},{:d},{:.9f},{:.9f},{:.9f},{:.9f},{:.9f}\n".format(
i, j, bind.k1, bind.k2, bind.b, bind.L, bind.C
)
)
import os
import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .bind import Bind
from .utils import readBlockData, get_hist
class Calibration(object):
"""Calibrate the detector according to the calibration data"""
def __init__(self):
pass
def __call__(self, file1, file2, bias_b, delta_e, n=6, m=8):
"""Calibration
Parameters
----------
file1/2 : str
data file path of energy 1/2
bias_b : float
bias $b = b_1 + b_2$, in MeV
delta_e : float
delta energy between two beams, in MeV
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
# Initialization
self.n, self.m = n, m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Data
file_list = csv.reader(open(file1, "r"))
pbar = tqdm(desc="Read Data E1", total=len(open(file1, "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], int(row[3]), pn, m)
for i in range(m):
bind = self.binds[pn][i]
bind.add_data(0, ldata[i], rdata[i], float(row[2]))
pbar.update(1)
pbar.close()
file_list = csv.reader(open(file2, "r"))
pbar = tqdm(desc="Read Data E2", total=len(open(file2, "r").readlines()))
for row in file_list:
pn = int(row[1])
ldata, rdata = readBlockData(row[0], int(row[3]), pn, m)
for i in range(m):
bind = self.binds[pn][i]
bind.add_data(1, ldata[i], rdata[i], float(row[2]))
pbar.update(1)
pbar.close()
# Data preprocessing
pbar = tqdm(desc="Bind Process", total=n * m)
for i in range(n):
for j in range(m):
bind: Bind = self.binds[i][j]
bind.clip()
bind.get_line()
bind.get_kb(bias_b, delta_e)
pbar.update(1)
pbar.close()
# Fit
pbar = tqdm(desc="Bind Fit", total=n * m)
for i in range(n):
for j in range(m):
bind: Bind = self.binds[i][j]
bind.get_peak_center()
bind.fit_px()
pbar.update(1)
pbar.close()
def draw_fit(self, path):
"""Draw fit result
Parameters
----------
path : str
save folder, there must be `FIT-LINE`, `GMM`, and `PEAK` 3 subfolders.
"""
pbar = tqdm(desc="Draw Fit Figure", total=self.n * self.m)
for i in range(self.n):
for j in range(self.m):
bind: Bind = self.binds[i][j]
bind.draw_fit_line(os.path.join(path, "FIT-LINE", bind.name + ".png"))
bind.draw_cluster(os.path.join(path, "GMM", bind.name + ".png"))
bind.draw_peak(os.path.join(path, "PEAK", bind.name + ".png"))
pbar.update(1)
pbar.close()
def draw_check(self, path="Check.png"):
"""Draw check figure
Parameters
----------
path : str, optional
save path
"""
pbar = tqdm(desc="Draw Check Figure", total=self.n * self.m)
fig = plt.figure(figsize=(24, 24), dpi=200)
ax1 = fig.add_subplot(3, 1, 1)
ax2 = fig.add_subplot(3, 1, 2)
ax3 = fig.add_subplot(3, 1, 3)
peaks = np.array([])
for i in range(self.n):
engs = np.array([])
for j in range(self.m):
bind = self.binds[i][j]
peaks = np.hstack((np.unique(bind.px[0]), peaks))
eng = bind.predict_energy(bind.x[0], bind.y[0])
pX = bind.predict_px(bind.x[0], bind.y[0])
count, center = get_hist(pX, step=0.5)
engs = np.hstack((engs, eng))
ax2.scatter(pX, eng, s=0.1, color="k")
ax3.scatter(center, count + 3000 * (7 - j), s=0.5, color="k")
pbar.update(1)
count, center = get_hist(engs, step=0.01)
ax1.plot(center, count)
peaks = np.unique(peaks)
for x in peaks:
ax3.vlines(x, 0, 3000 * self.m, color="gray", linestyles="dashed")
for j in range(self.m):
ax3.hlines(
3000 * j,
(np.min(peaks) // 50) * 50,
(np.max(peaks) // 50 + 1) * 50,
color="r",
linestyles="dashdot",
)
ax1.set_xlabel("Energy (MeV)")
ax1.set_ylabel("Count per bin")
ax2.set_xlabel("x (mm)")
ax2.set_ylabel("Energy (MeV)")
ax3.set_xlabel("x (mm)")
ax3.set_ylabel("Count per bin")
fig.savefig(path, facecolor="w", transparent=False)
plt.close()
pbar.close()
def save(self, path="coef.csv"):
"""Save coefficient to file
Parameters
----------
path : str, optional
save path
"""
f = open(path, "w")
for i in range(self.n):
for j in range(self.m):
bind = self.binds[i][j]
f.writelines(
"{:d},{:d},{:.9f},{:.9f},{:.9f},{:.9f},{:.9f}\n".format(
i, j, bind.k1, bind.k2, bind.b, bind.L, bind.C
)
)

View File

@ -1,56 +1,56 @@
import numpy as np
from astropy.modeling import models, fitting
from .utils import get_hist
def fit_line(model, x, y):
"""
Line fitting kx+b
Parameters
----------
model : astropy.modeling.Model
fit model
x : array
x data, can have two columns representing two independent variables
y : array
y data
Returns
-------
fitted_model : astropy fitting model
fitting gaussian model
"""
fitter = fitting.LevMarLSQFitter()
if np.ndim(x) == 1:
fitted_model = fitter(model, x, y)
else:
fitted_model = fitter(model, x[:, 0], x[:, 1], y)
return fitted_model
def fit_hist_gaussian(x, step=1):
"""
Gaussian fitting is performed on the histogram
Parameters
----------
x : array
data point
step : int, optional
Minimum bin width. The bin width is an integer multiple of step.
Returns
-------
fitted_model : astropy fitting model
fitting gaussian model
"""
fitter = fitting.LMLSQFitter()
count, center = get_hist(x, step=step)
model = models.Gaussian1D(amplitude=count.max(), mean=x.mean(), stddev=x.std())
fitted_model = fitter(model, center, count)
return fitted_model
import numpy as np
from astropy.modeling import models, fitting
from .utils import get_hist
def fit_line(model, x, y):
"""
Line fitting kx+b
Parameters
----------
model : astropy.modeling.Model
fit model
x : array
x data, can have two columns representing two independent variables
y : array
y data
Returns
-------
fitted_model : astropy fitting model
fitting gaussian model
"""
fitter = fitting.LevMarLSQFitter()
if np.ndim(x) == 1:
fitted_model = fitter(model, x, y)
else:
fitted_model = fitter(model, x[:, 0], x[:, 1], y)
return fitted_model
def fit_hist_gaussian(x, step=1):
"""
Gaussian fitting is performed on the histogram
Parameters
----------
x : array
data point
step : int, optional
Minimum bin width. The bin width is an integer multiple of step.
Returns
-------
fitted_model : astropy fitting model
fitting gaussian model
"""
fitter = fitting.LMLSQFitter()
count, center = get_hist(x, step=step)
model = models.Gaussian1D(amplitude=count.max(), mean=x.mean(), stddev=x.std())
fitted_model = fitter(model, center, count)
return fitted_model

View File

@ -1,86 +1,86 @@
import numpy as np
from astropy.modeling import Fittable1DModel, Fittable2DModel, Parameter
class Linear1D(Fittable1DModel):
"""
One dimensional Line model.
Parameters
----------
slope : float
Slope of the straight line
intercept : float
Intercept of the straight line
"""
slope = Parameter(default=1, description="Slope of the straight line")
intercept = Parameter(default=0, description="Intercept of the straight line")
@staticmethod
def evaluate(x, slope, intercept):
return slope * x + intercept
@staticmethod
def fit_deriv(x, slope, intercept):
d_slope = x
d_intercept = np.ones_like(x)
return [d_slope, d_intercept]
class FixedSlopeLine(Fittable1DModel):
"""
Linear model, but fix the slope
Parameters
----------
slope : float
Slope of the line
intercept : float
Intercept of the line
"""
slope = Parameter()
intercept = Parameter()
@staticmethod
def evaluate(x, slope, intercept):
return slope * x + intercept
@staticmethod
def fit_deriv(x, slope, intercept):
d_slope = np.zeros_like(x)
d_intercept = np.ones_like(x)
return [d_slope, d_intercept]
class pXLine(Fittable2DModel):
"""
pX linear model.
$Px = \\frac{k_2y - k_1x}{E}L+CL$.
Parameters
----------
L : float
Slope of the straight line
C : float
Intercept / Slope of the straight line
"""
linear = True
L, C = Parameter(default=40), Parameter(default=0)
k1, k2, b = Parameter(), Parameter(), Parameter()
@staticmethod
def evaluate(x, y, L, C, k1, k2, b):
E = k1 * x + k2 * y + b
return (k2 * y - k1 * x) / E * L + C * L
@staticmethod
def fit_deriv(x, y, L, C, k1, k2, b):
E = k1 * x + k2 * y + b
d_L = (k2 * y - k1 * x) / E + C
d_C = np.full(x.shape, L)
d_k1, d_k2, d_b = np.zeros_like(x), np.zeros_like(x), np.zeros_like(x)
return [d_L, d_C, d_k1, d_k2, d_b]
import numpy as np
from astropy.modeling import Fittable1DModel, Fittable2DModel, Parameter
class Linear1D(Fittable1DModel):
"""
One dimensional Line model.
Parameters
----------
slope : float
Slope of the straight line
intercept : float
Intercept of the straight line
"""
slope = Parameter(default=1, description="Slope of the straight line")
intercept = Parameter(default=0, description="Intercept of the straight line")
@staticmethod
def evaluate(x, slope, intercept):
return slope * x + intercept
@staticmethod
def fit_deriv(x, slope, intercept):
d_slope = x
d_intercept = np.ones_like(x)
return [d_slope, d_intercept]
class FixedSlopeLine(Fittable1DModel):
"""
Linear model, but fix the slope
Parameters
----------
slope : float
Slope of the line
intercept : float
Intercept of the line
"""
slope = Parameter()
intercept = Parameter()
@staticmethod
def evaluate(x, slope, intercept):
return slope * x + intercept
@staticmethod
def fit_deriv(x, slope, intercept):
d_slope = np.zeros_like(x)
d_intercept = np.ones_like(x)
return [d_slope, d_intercept]
class pXLine(Fittable2DModel):
"""
pX linear model.
$Px = \\frac{k_2y - k_1x}{E}L+CL$.
Parameters
----------
L : float
Slope of the straight line
C : float
Intercept / Slope of the straight line
"""
linear = True
L, C = Parameter(default=40), Parameter(default=0)
k1, k2, b = Parameter(), Parameter(), Parameter()
@staticmethod
def evaluate(x, y, L, C, k1, k2, b):
E = k1 * x + k2 * y + b
return (k2 * y - k1 * x) / E * L + C * L
@staticmethod
def fit_deriv(x, y, L, C, k1, k2, b):
E = k1 * x + k2 * y + b
d_L = (k2 * y - k1 * x) / E + C
d_C = np.full(x.shape, L)
d_k1, d_k2, d_b = np.zeros_like(x), np.zeros_like(x), np.zeros_like(x)
return [d_L, d_C, d_k1, d_k2, d_b]

View File

@ -1,131 +1,131 @@
import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .Bind import Bind
from .fit import fit_line
from .model import Linear1D
from .utils import readFileData, get_hist
class Process(object):
"""Process the experimental data according to the calibration results."""
def __init__(self) -> None:
pass
def __call__(self, coef, task, n=6, m=8):
"""Read Process Data
coef : str
coefficient file
task : str
task file
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
# Initialization
self.n, self.m = n, m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Calibration Data
pbar = tqdm(desc="Bind Initialization", total=n * m)
data = list(csv.reader(open(coef, "r")))
data = np.array(data, dtype=np.float64)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
bind(data[j + i * m][2:])
pbar.update(1)
pbar.close()
# Read Data
total = len(open(task, "r").readlines()) * n * m
file_list = csv.reader(open(task, "r"))
self.pX = np.array([])
self.eng = np.array([])
pbar = tqdm(desc="Read Data", total=total)
for row in file_list:
ldata, rdata = readFileData(row[0], n, m)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
x = bind.predict_px(ldata[j + i * m], rdata[j + i * m]) + float(
row[1]
)
e = bind.predict_energy(ldata[j + i * m], rdata[j + i * m])
edge_l = 5 + 130 * i + float(row[1]) - 35
edge_r = edge_l + 65
idx = np.where((x >= edge_l) & (x <= edge_r))[0]
self.pX = np.hstack((self.pX, x[idx]))
self.eng = np.hstack((self.eng, e[idx]))
pbar.update(1)
pbar.close()
def energy_filter(self, lower, upper, sigma=5.0, maxiters=5):
"""Fit px - E line and do sigma clip iteratively.
Parameters
----------
lower/upper : float
Upper and lower bounds on the initial filter
sigma: float, optional
The number of standard deviations to use for both the lower and upper clipping limit.
maxiters: int or None, optional
The maximum number of sigma-clipping iterations to perform or None to clip until convergence is achieved.
If convergence is achieved prior to maxiters iterations, the clipping iterations will stop.
"""
model = Linear1D()
idx = np.where((self.eng >= lower) & (self.eng <= upper))[0]
x, y = self.pX[idx], self.eng[idx]
for i in range(maxiters):
reg = fit_line(model, x, y)
err = np.abs(y - reg(x))
idx = np.where(err <= sigma * np.std(err))[0]
if len(idx) == len(x):
break
x, y = x[idx], y[idx]
self.pX_n = x
self.eng_n = y
self.reg = reg
def draw_result(self, path="result.png"):
"""Draw the processing result
Parameters
----------
path : str, optional
save path
"""
fig = plt.figure(figsize=(24, 12), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
count, center = get_hist(self.pX_n, step=0.1)
ax1.scatter(self.pX, self.eng, s=0.01, color="black")
ax1.scatter(self.pX_n, self.eng_n, s=0.01, color="orange")
ax2.step(center, count, where="post", color="k")
px_min = (np.min(self.pX_n) // 50) * 50
px_max = (np.max(self.pX_n) // 50 + 1) * 50
px_x = np.linspace(px_min, px_max, int(px_max - px_min))
ax1.plot(px_x, self.reg(px_x))
ax1.set_xticks(np.arange(px_min, px_max, 50))
ax2.set_xticks(np.arange(px_min, px_max, 50))
ax1.set_xlabel("x (mm)")
ax1.set_ylabel("Energy (MeV)")
ax2.set_xlabel("x (mm)")
ax2.set_ylabel("Count per bin")
fig.savefig(path, facecolor="w", transparent=False)
plt.close()
import csv
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from .bind import Bind
from .fit import fit_line
from .model import Linear1D
from .utils import readFileData, get_hist
class Process(object):
"""Process the experimental data according to the calibration results."""
def __init__(self) -> None:
pass
def __call__(self, coef, task, n=6, m=8):
"""Read Process Data
coef : str
coefficient file
task : str
task file
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
"""
# Initialization
self.n, self.m = n, m
self.binds = [[Bind(i, j) for j in range(m)] for i in range(n)]
# Read Calibration Data
pbar = tqdm(desc="Bind Initialization", total=n * m)
data = list(csv.reader(open(coef, "r")))
data = np.array(data, dtype=np.float64)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
bind(data[j + i * m][2:])
pbar.update(1)
pbar.close()
# Read Data
total = len(open(task, "r").readlines()) * n * m
file_list = csv.reader(open(task, "r"))
self.pX = np.array([])
self.eng = np.array([])
pbar = tqdm(desc="Read Data", total=total)
for row in file_list:
ldata, rdata = readFileData(row[0], n, m)
for i in range(n):
for j in range(m):
bind = self.binds[i][j]
x = bind.predict_px(ldata[j + i * m], rdata[j + i * m]) + float(
row[1]
)
e = bind.predict_energy(ldata[j + i * m], rdata[j + i * m])
edge_l = 5 + 130 * i + float(row[1]) - 35
edge_r = edge_l + 65
idx = np.where((x >= edge_l) & (x <= edge_r))[0]
self.pX = np.hstack((self.pX, x[idx]))
self.eng = np.hstack((self.eng, e[idx]))
pbar.update(1)
pbar.close()
def energy_filter(self, lower, upper, sigma=5.0, maxiters=5):
"""Fit px - E line and do sigma clip iteratively.
Parameters
----------
lower/upper : float
Upper and lower bounds on the initial filter
sigma: float, optional
The number of standard deviations to use for both the lower and upper clipping limit.
maxiters: int or None, optional
The maximum number of sigma-clipping iterations to perform or None to clip until convergence is achieved.
If convergence is achieved prior to maxiters iterations, the clipping iterations will stop.
"""
model = Linear1D()
idx = np.where((self.eng >= lower) & (self.eng <= upper))[0]
x, y = self.pX[idx], self.eng[idx]
for i in range(maxiters):
reg = fit_line(model, x, y)
err = np.abs(y - reg(x))
idx = np.where(err <= sigma * np.std(err))[0]
if len(idx) == len(x):
break
x, y = x[idx], y[idx]
self.pX_n = x
self.eng_n = y
self.reg = reg
def draw_result(self, path="result.png"):
"""Draw the processing result
Parameters
----------
path : str, optional
save path
"""
fig = plt.figure(figsize=(24, 12), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
count, center = get_hist(self.pX_n, step=0.1)
ax1.scatter(self.pX, self.eng, s=0.01, color="black")
ax1.scatter(self.pX_n, self.eng_n, s=0.01, color="orange")
ax2.step(center, count, where="post", color="k")
px_min = (np.min(self.pX_n) // 50) * 50
px_max = (np.max(self.pX_n) // 50 + 1) * 50
px_x = np.linspace(px_min, px_max, int(px_max - px_min))
ax1.plot(px_x, self.reg(px_x))
ax1.set_xticks(np.arange(px_min, px_max, 50))
ax2.set_xticks(np.arange(px_min, px_max, 50))
ax1.set_xlabel("x (mm)")
ax1.set_ylabel("Energy (MeV)")
ax2.set_xlabel("x (mm)")
ax2.set_ylabel("Count per bin")
fig.savefig(path, facecolor="w", transparent=False)
plt.close()

View File

@ -1,141 +1,141 @@
import uproot
import numpy as np
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture
def readFileData(file, count, n=6, m=8, minT=800, maxT=4000):
"""Read whole data from root file
Parameters
----------
file : str
root file path
count : int
count that normalized by counts of Faraday cylinder
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
minT/maxT : int, optional
Filtering data, the sum of the left and right sides needs to be in the interval [minT, maxT]
min / max threshold
"""
data = uproot.open(file)["Tree1"]
ldata, rdata = [], []
for i in range(n):
for j in range(m):
na = i // 2
nc = j + 2 * m * (i % 2)
x = data["adc{:d}ch{:d}".format(na, nc)].array(library="np")[:count]
y = data["adc{:d}ch{:d}".format(na, nc + m)].array(library="np")[:count]
idx = np.where((x + y >= minT) & (x + y <= maxT))[0]
ldata.append(x[idx])
rdata.append(y[idx])
return ldata, rdata
def readBlockData(file, count, n, m=8, minT=800, maxT=4000):
"""Read block data from root file
Parameters
----------
file : str
root file path
count : int
count that normalized by counts of Faraday cylinder
n : int
No.n block
m : int, optional
number of binds, default 8
minT/maxT : int, optional
Filtering data, the sum of the left and right sides needs to be in the interval [minT, maxT]
min / max threshold
"""
data = uproot.open(file)["Tree1"]
ldata, rdata = [], []
for j in range(m):
na = n // 2
nc = j + 2 * m * (n % 2)
x = data["adc{:d}ch{:d}".format(na, nc)].array(library="np")[:count]
y = data["adc{:d}ch{:d}".format(na, nc + m)].array(library="np")[:count]
idx = np.where((x + y >= minT) & (x + y <= maxT))[0]
ldata.append(x[idx])
rdata.append(y[idx])
return ldata, rdata
def draw_scatter(data, title, s=0.1):
"""Draw points using scatter
Parameters
----------
s : float, optional
size of scatter point, default 0.1
"""
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
for cluster in data:
ax.scatter(cluster[:, 0], cluster[:, 1], s=s)
fig.savefig(title, facecolor="w", transparent=False)
plt.close()
def get_hist(data, step=1, maxN=50, return_edge=False):
"""Gets the boundary of histogram that the maximum count is bigger than threshold
Parameters
----------
step : int, optional
Minimum bin width. The bin width is an integer multiple of step.
maxN : int, optional
Maximum count threshold
return_edge: bool, optional
If True, then the bin edges are also returned.
"""
delta = step
edge = np.arange(data.min(), data.max() + 1, delta)
count, _ = np.histogram(data, bins=edge)
try:
while count.max() <= maxN:
delta += step
edge = np.arange(data.min(), data.max() + 1, delta)
count, _ = np.histogram(data, bins=edge)
except:
edge = np.arange(data.min(), data.max() + 1, step)
count, _ = np.histogram(data, bins=edge)
if return_edge:
return count / delta, (edge[1:] + edge[:-1]) / 2, edge
else:
return count / delta, (edge[1:] + edge[:-1]) / 2
def GMM_clip(data, return_all=False):
"""Using Gaussian Mixture Method (GMM) to decompose the data into noise and available data
Parameters
----------
data : numpy.ndarray
Data to be clipped
return_all: bool, optional
If True, then all data will be returned.
"""
fit_data = np.array([])
model = GaussianMixture(n_components=2)
model.fit(data[:, :2])
ny = model.predict(data[:, :2])
for i in np.unique(ny):
idx = np.where(ny == i)[0]
fit_data = idx if len(idx) > len(fit_data) else fit_data
if return_all:
return ny
else:
return data[fit_data]
import uproot
import numpy as np
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture
def readFileData(file, count, n=6, m=8, minT=800, maxT=4000):
"""Read whole data from root file
Parameters
----------
file : str
root file path
count : int
count that normalized by counts of Faraday cylinder
n : int, optional
number of blocks, default 6
m : int, optional
number of binds, default 8
minT/maxT : int, optional
Filtering data, the sum of the left and right sides needs to be in the interval [minT, maxT]
min / max threshold
"""
data = uproot.open(file)["Tree1"]
ldata, rdata = [], []
for i in range(n):
for j in range(m):
na = i // 2
nc = j + 2 * m * (i % 2)
x = data["adc{:d}ch{:d}".format(na, nc)].array(library="np")[:count]
y = data["adc{:d}ch{:d}".format(na, nc + m)].array(library="np")[:count]
idx = np.where((x + y >= minT) & (x + y <= maxT))[0]
ldata.append(x[idx])
rdata.append(y[idx])
return ldata, rdata
def readBlockData(file, count, n, m=8, minT=800, maxT=4000):
"""Read block data from root file
Parameters
----------
file : str
root file path
count : int
count that normalized by counts of Faraday cylinder
n : int
No.n block
m : int, optional
number of binds, default 8
minT/maxT : int, optional
Filtering data, the sum of the left and right sides needs to be in the interval [minT, maxT]
min / max threshold
"""
data = uproot.open(file)["Tree1"]
ldata, rdata = [], []
for j in range(m):
na = n // 2
nc = j + 2 * m * (n % 2)
x = data["adc{:d}ch{:d}".format(na, nc)].array(library="np")[:count]
y = data["adc{:d}ch{:d}".format(na, nc + m)].array(library="np")[:count]
idx = np.where((x + y >= minT) & (x + y <= maxT))[0]
ldata.append(x[idx])
rdata.append(y[idx])
return ldata, rdata
def draw_scatter(data, title, s=0.1):
"""Draw points using scatter
Parameters
----------
s : float, optional
size of scatter point, default 0.1
"""
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
for cluster in data:
ax.scatter(cluster[:, 0], cluster[:, 1], s=s)
fig.savefig(title, facecolor="w", transparent=False)
plt.close()
def get_hist(data, step=1, maxN=50, return_edge=False):
"""Gets the boundary of histogram that the maximum count is bigger than threshold
Parameters
----------
step : int, optional
Minimum bin width. The bin width is an integer multiple of step.
maxN : int, optional
Maximum count threshold
return_edge: bool, optional
If True, then the bin edges are also returned.
"""
delta = step
edge = np.arange(data.min(), data.max() + 1, delta)
count, _ = np.histogram(data, bins=edge)
try:
while count.max() <= maxN:
delta += step
edge = np.arange(data.min(), data.max() + 1, delta)
count, _ = np.histogram(data, bins=edge)
except:
edge = np.arange(data.min(), data.max() + 1, step)
count, _ = np.histogram(data, bins=edge)
if return_edge:
return count / delta, (edge[1:] + edge[:-1]) / 2, edge
else:
return count / delta, (edge[1:] + edge[:-1]) / 2
def GMM_clip(data, return_all=False):
"""Using Gaussian Mixture Method (GMM) to decompose the data into noise and available data
Parameters
----------
data : numpy.ndarray
Data to be clipped
return_all: bool, optional
If True, then all data will be returned.
"""
fit_data = np.array([])
model = GaussianMixture(n_components=2)
model.fit(data[:, :2])
ny = model.predict(data[:, :2])
for i in np.unique(ny):
idx = np.where(ny == i)[0]
fit_data = idx if len(idx) > len(fit_data) else fit_data
if return_all:
return ny
else:
return data[fit_data]

Binary file not shown.