""" This module defines the functions that generate the burst train """
import numpy as np
import random
import matplotlib.pyplot as plt
# Here we define the different functions that will simulate the bursts:
# generate_burst_train & punkt_train for burst trains (each of these rely
# on next_burst), and burstensemble for ensemble mode
# ------------------------------------------------------------------------- #
def next_burst( bean, base, x_0, z, t1, dist, xi_p, cfac, mass, radius,
mdot_res = 5e-7, direction=1, debug=False ):
"""
Routine to find the next burst in the series and return its properties
Adapted from sim_burst.pro
:param bean: Beans object, from which the other inputs are read in:
tobs, a, b,
:param base: base flux [MeV/nucleon]
:param x_0: accreted H-fraction
:param z: accreted CNO metallicity
:param t1: time to start burst search at
:param dist: source distance (kpc)
:param xi_p: anisotropy of persistent emission
:param cfac: scale factor for recurrence time, fluence
:param mass: NS mass (M_sun)
:param radius: NS radius (km)
:param mdot_res: convergence precision for mdot iteration
:param direction: forward (+1) or backward (-1) in time
:param debug: set to True to show additional debugging information
:return:
"""
tobs = bean.tobs
debug_plot=False # for now don't do the debug plots
fn = "next_burst"
assert direction in (1,-1)
minmdot = 0.0
maxmdot = 1.0
# Determine the initial guess for mean mdot (linear)
# i0=min([n_elements(a)-1,max(where(t1 gt tobs))])
itobs = np.where(t1 > tobs)[0]
if (len(itobs) == 0) & (direction == -1):
# the start time is before *any* of the observations; don't bother!
return None
if len(itobs) == 0:
# this makes no sense to me; if the t1 value is < all the tobs values, then the
# nearest element would be the zeroth
# itobs = [-1]
itobs = [0]
# i0=max([0,min([len(a)-1,max([i for i, value in enumerate(tobs) if value < t1])])])
# Now that we have a couple of options for interpolation, need to
# remove the reliance on the linear interpolation parameters
# i0 = max([0, min([len(a) - 1, max(itobs)])])
# mdot0 = (0.67 / 8.8) * bean.pflux[itobs[0]] * r1
mdot0 = bean.flux_to_mdot(x_0, dist, xi_p, mass, radius, bean.pflux[itobs[-1]])
if debug:
print("{}: z={}, X_0={}, mdot_0={}, itobs={}".format(
fn, z, x_0, mdot0, itobs[-1] ))
# Calculate the burst properties for the trial mdot value
trial = bean.model(base, z, x_0, mdot0, mass, radius,
corr=bean.corr, interpolator=bean.gi)
if debug:
print ('{}: initial guess mdot0={} @ t1={}, tdel={}, direction={}'.format(fn,mdot0,t1,trial.tdel,direction))
# Now update the mdot with the value averaged over the trial interval
# not sure why trial.tdel has suddenly become a vector
if direction == 1:
mdot = bean.flux_to_mdot(x_0, dist, xi_p, mass, radius,
bean.mean_flux(t1, t1 + trial.tdel[0] / 24.0, bean) )
else:
mdot = bean.flux_to_mdot(x_0, dist, xi_p, mass, radius,
bean.mean_flux(t1 - trial.tdel[0] / 24.0, t1, bean) )
# Now retain the entire history of this iteration, so we can check for loops
mdot_hist = [mdot0]
tdel_hist = [trial.tdel[0]/24.]
nreturn = 0
nreturn_total = 0
while (abs(mdot - mdot_hist[-1]) > mdot_res) \
and (((t1 + trial.tdel / 24.0 < 2.*max(tobs)) & (direction == 1)) \
or ((t1 - trial.tdel / 24.0 > min(tobs)-(max(tobs)-min(tobs))) & (direction == -1))) \
and (mdot > minmdot and mdot < maxmdot):
trial = bean.model(base, z, x_0, mdot, mass, radius,
corr=bean.corr, interpolator=bean.gi)
nreturn = nreturn + 1
nreturn_total = nreturn_total + 1
mdot_hist.append(mdot)
tdel_hist.append(trial.tdel[0]/24.)
if direction == 1:
mdot = bean.flux_to_mdot(x_0, dist, xi_p, mass, radius,
bean.mean_flux(t1, t1 + (trial.tdel[0] / 24.0), bean) )
else:
mdot = bean.flux_to_mdot(x_0, dist, xi_p, mass, radius,
bean.mean_flux(t1 - (trial.tdel[0] / 24.0), t1, bean) )
# Break out of the loop here, if necessary
if nreturn > 10:
e = random.random()
mdot = mdot_hist[-1] * (1.0 - e) + mdot * e
# Perhaps you should try to reset this randomly every 10 steps? - dkg
# Yes, otherwise every trial above 10 steps will be random
nreturn = 0
if nreturn_total > 30:
e = random.random()
mdot = mdot_hist[-1] * (1.0 - e)+ mdot * e
# save the final versions to the history arrays
mdot_hist.append(mdot)
tdel_hist.append(trial.tdel[0]/24.)
if debug:
print('{}: nreturn={}, nreturn_total={}, mdot_hist={}'.format(
fn, nreturn, nreturn_total, mdot_hist))
if debug_plot:
# now produce a diagnostic plot with the debug flag
# plt.plot(t1+np.array(tdel_hist), mdot_hist, '.', label='tdel history')
for tdel in tdel_hist:
plt.axvline(t1+tdel, color='k', ls='--')
# also calculate a bunch of values to compare with
t_arr = np.arange(t1, max(tobs), step=0.1)
m_arr = [0]
t_arr2 = [t1]
for t in t_arr[1:]:
_mdot = bean.flux_to_mdot(x_0, dist, xi_p, mass, radius,
bean.mean_flux(t1, t, bean) )
_tmp = bean.model(base, z, x_0, _mdot, mass, radius,
corr=bean.corr, interpolator=bean.gi)
t_arr2.append(t1+_tmp.tdel[0]/24.)
m_arr.append(_mdot)
plt.plot(t_arr, np.array(t_arr2), '-', label='tdel')
plt.plot(t_arr, t_arr, '-', label='1:1')
plt.xlim((0,1.1*max(t1+np.array(tdel_hist))))
plt.ylim((0,1.1*max(t1+np.array(tdel_hist))))
# plt.plot(np.array(t_arr2), np.array(m_arr), '.')
plt.legend()
plt.show()
# if mdot < minmdot or mdot > maxmdot:
if abs(mdot - mdot_hist[-2]) > mdot_res:
return None
# create array
#print(f'{fn}: mdot={mdot}, tdel={trial.tdel}')
result = np.recarray(
(1,), dtype=[("t2", np.float64), ("e_b", np.float64),
("alpha", np.float64), ("mdot", np.float64)])
# assign elements
result.t2 = t1 + direction * trial.tdel / 24.0
# at one point we were multiplying eb by 0.8 to account for incomlpete
# burning of fuel, as in Goodwin et al (2018).
result.e_b = trial.E_b
result.alpha = trial.alpha
# result.qnuc = tmp.Q_nuc
# result.xbar = tmp.xbar
result.mdot = mdot
return result
# -------------------------------------------------------------------------#
def punkt_train(bean, base, x_0, z, dist, xi_p, mass, radius,
indep=True, full_model=False, debug=False):
"""
Replacement for generate_burst_train. This version will connect burst
sequences only out to a maximum gap of maxgap bursts. If there are more
bursts falling into a gap, it will stop tracking and restart the
sequence at the next (observed) burst.
Input parameters are as for generate_burst_train; maxgap is pulled from
the beans object
:param bean: Beans object, from which the remaining parameters are drawn:
bstart, pflux, pfluxe, tobs, numburstssim, ref_ind
:param base: base flux [MeV/nucleon]
:param x_0: accreted H-fraction
:param z: accreted CNO metallicity
:param dist: source distance (kpc)
:param xi_p: anisotropy of persistent emission
:param mass: NS mass (M_sun)
:param radius: NS radius (km)
:param indep: if set to True, all forward simulations go from the observed bursts, not from the last simulated burst in a train
:param full_model: if set to True, include all the parameters in the
dict that is returned
:param debug: set to True to show additional debugging information
"""
fn = 'punkt_train'
cfac = 1.0 # no longer used
if debug:
print (fn, base, x_0, z, dist, xi_p, mass, radius)
# array of observed bursts is bean.bstart
iburst = 0 # running index of observed bursts
stime, sgap, salpha, se_b, smdot, imatch = [bean.bstart[0]], [], [], [], [], []
while iburst < bean.numburstsobs:
gap = False
# search phase 1; at the start of a new sub-train
_last = stime[-1] if not indep else bean.bstart[iburst]
result_b = next_burst(bean, base, x_0, z, _last,
dist, xi_p, cfac, mass, radius, direction=-1, debug=debug)
# returns a rec_array with elements t2, e_b, alpha, mdot
if result_b is None:
# this can happen if the flux history doesn't cover enough
# time prior to the first observed burst; but is catastrophic
# in terms of the model, since we're short a burst
# We carry on below, but this is trapped in runmodel and a
# negative result returned
assert iburst == 0
else:
salpha.append(result_b.alpha[0])
se_b.append(result_b.e_b[0])
smdot.append(result_b.mdot[0])
imatch.append(len(stime) - 1)
# print (f'-ve search: stime = {stime}, imatch = {imatch}, next = {bean.bstart[iburst+1]}')
# make the first forward step
buffer = []
while (gap == False) & (iburst < bean.numburstsobs):
# this is the loop to find the next burst in a contiguous segment
if debug:
print('\n---> matching burst #{}'.format(iburst + 1))
_last = stime[-1]
if indep:
# restart the train here from the current burst; also wipe the buffer
_last = bean.bstart[iburst]
buffer = []
if len(buffer) == 0:
# only run this step if we're left with an empty buffer after the previous
result_f = next_burst(bean, base, x_0, z, _last,
dist, xi_p, cfac, mass, radius, direction=1, debug=debug)
if result_f is None:
# this can happen if we've gone out beyond the end of
# the flux history, presumably
# need to trigger the exit criterion as well as
# breaking out of the loop
iburst = bean.numburstsobs
break
buffer.append(result_f)
if debug:
print(f'\n1st step: buffer = {buffer}')
else:
result_f = buffer[0]
# buffer = []
if debug:
print(result_f, result_f.t2, type(buffer), type(result_f.t2))
while ((result_f.t2[0] < bean.bstart[min([iburst + 1, bean.numburstsobs - 1])])
& (len(buffer) <= bean.maxgap)):
# loop until the current simulated burst time is later than the next observed time
# (or we've exceeded the missed burst count)
result_f = next_burst(bean, base, x_0, z, result_f.t2[0],
dist, xi_p, cfac, mass, radius, direction=1, debug=debug)
if result_f is None:
# this can happen if we've gone out beyond the end of the flux history,
# but what should we do with the leftover buffer? match the last burst?
break
buffer.append(result_f)
if debug:
print(f'next step: buffer = {buffer}')
if len(buffer) == 0:
# I think we're done
pass
elif len(buffer) == 1:
# if only one burst has been simulated, it must be > the next burst, so add to
# the train
result_f = buffer.pop(0)
stime.append(result_f.t2[0])
if len(imatch) < bean.numburstsobs:
# only do this if imatch does not already have all the bursts matched
# this can happen at the end of the train, where we might want to add
# a last burst past the end of the train BUT it's not matching
imatch.append(len(stime) - 1)
sgap.append(False)
salpha.append(result_f.alpha[0])
se_b.append(result_f.e_b[0])
smdot.append(result_f.mdot[0])
if debug:
print(f'solo buffer #{iburst + 1}, adding time {result_f.t2[0]}')
else:
if debug:
print(len(buffer), iburst, bean.numburstsobs)
sep_last = buffer[-1].t2 - buffer[-2].t2
diff_2last = np.abs(np.array([buffer[-1].t2[0], buffer[-2].t2[0]]) - bean.bstart[iburst + 1])
# print(sep_last, diff_2last)
if min(diff_2last) < sep_last:
if debug:
print('+ve search: got match!')
# if multiple bursts have been simulated, we pick which of the last 2 is closest
ibest = len(buffer) - np.argmin(diff_2last)
# print (ibest, buffer, buffer[:ibest])
for i in range(ibest):
# print (i, buffer[i], result_f.mdot)
result_f = buffer.pop(0)
stime.append(result_f.t2[0])
salpha.append(result_f.alpha[0])
se_b.append(result_f.e_b[0])
# print (stime, salpha, se_b)
smdot.append(result_f.mdot[0])
imatch.append(len(stime) - 1)
sgap.append(False)
else:
# we don't have a match, so need to trigger the gap logic
gap = True
sgap.append(True)
if debug:
print(f'set gap={gap}! iburst={iburst} diff_2last={diff_2last} sep_last={sep_last}')
stime.append(bean.bstart[iburst + 1])
# and either way we're switching to the next burst in the observed train
iburst += 1
# print (f'+ve search: stime = {stime}, imatch = {imatch}, next = {bean.bstart[iburst+1]}')
result = dict()
if full_model:
# model parameters are redundant for the model returned
result["base"] = [base]
result["z"] = [z]
result["x_0"] = [x_0]
result["dist"] = [dist]
result["xi_p"] = [xi_p]
# result["mdot_max"] = [mdot_max]
result["mass"] = [mass]
result["radius"] = [radius]
# now the actual predictions
result["time"] = np.array(stime)
if len(stime) > 0:
# The simulation might fail to generate any bursts, so only add the arrays if they exist
result["mdot"] = np.array(smdot)
result["alpha"] = np.array(salpha)
result["e_b"] = np.array(se_b)
result["imatch"] = imatch
# print(f"In burstrain fluence is {se_b}")
result["gap"] = sgap
if debug:
print("{}: train complete, result={}".format(fn, result))
return result
# -------------------------------------------------------------------------#
[docs]def generate_burst_train( bean, base, x_0, z, dist, xi_p, mass, radius,
full_model=False, debug=False):
"""
This routine generates a simulated burst train based on the model
input parameters, and the mdot history inferred from the persistent
flux measurements (tobs, pflux, pfluxe)
:param bean: Beans object, from which the remaining parameters are drawn:
bstart, pflux, pfluxe, tobs, numburstssim, ref_ind
:param base: base flux [MeV/nucleon]
:param x_0: accreted H-fraction
:param z: accreted CNO metallicity
:param dist: source distance (kpc)
:param xi_p: anisotropy of persistent emission
:param mass: NS mass (M_sun)
:param radius: NS radius (km)
:param full_model: if set to True, include all the parameters in the
dict that is returned
:param debug: set to True to show additional debugging information
:return: a dictionary with the following keys: ``['time', 'mdot', 'alpha', 'e_b', 'iref']``, and optionally (if full_model is set to True) also: ``['base', 'z', 'x_0', 'dist', 'xi_p', 'mdot_max', 'mass', 'radius', 'forward', 'backward']`` (all but the last 2 are copies of the input parameters).
The main outputs are the ``time``, ``mdot``, ``alpha``, and ``e_b`` elements
(numpy arrays) which are the predicted properties of the bursts and
the accretion rate inferred for each interval.
We have one more element of the time array than the other arrays, because
we can't determine the properties for that burst, as we may not have
enough data to back project. So the ith element of e_b, alpha etc.
belongs with the (i+1)th element of time:
.. code-block:: text
time [ 0 1 2 3 4 5 6 7 ... n ]
mdot [ 0 1 2 3 4 5 6 ... n-1 ]
alpha [ 0 1 2 3 4 5 6 ... n-1 ]
e_b [ 0 1 2 3 4 5 6 ... n-1 ]
"""
fn = "generate_burst_train"
forward, backward = True, True # go in both directions at the start
mdot_max = -1
cfac = 1.0 # default to take the output of settle directly
# cfac = 0.65 # temporary to try to match the objective of Goodwin et
# al. (2019)
# Now to go ahead and try to simulate the bursts train with the resulting
# best set of parameters
# Start from the *second* (observed) burst in the train
# Originally this would have simulated four bursts following the reference,
# and three preceding. However, the last burst in the train (the 8th) for
# runs test17 were wildly variable, so now restrict the extent by one
if bean.bstart is not None:
sbt = bean.bstart[bean.ref_ind]
else:
# In the absence of any bursts, set the reference time to ref_ind
# (can be any time within the outburst)
# TODO check if this is consistent with the usage for initial
# value of earliest/latest below; not sure of the use case
# sbt = 0.0
sbt = bean.ref_ind
flag = 1 # Initially OK
stime, iref = [sbt], 0 # initialise list to store simulated times
salpha, se_b, smdot = [], [], [] # ditto for other lists
earliest = sbt # (running) time of the earliest burst in the train
latest = sbt # (running) time of the latest burst in the train
for i in range(0, bean.numburstssim):
assert earliest == stime[0] # I think these variables are redundant
assert latest == stime[-1]
if debug:
print ("{}: simulating burst {} of {}".format(fn, i, bean.numburstssim))
# Here we adopted recurrence time corrections for SAX
# J1808.4--3658 ,since the accretion rate is not constant over the
# extrapolated time, resulting in the recurrence time being
# underestimated by settle. Correction factors are from Zac
# Johnston, calculated using KEPLER
# if i == 0: # This is observed burst at 1.89 cfac1 = 1.02041
# cfac2 = 1.02041
# if (
# i == 1
# ): # to the right this is 3rd observed burst, to left it is predicted burst
# cfac1 = 1.00
# cfac2 = 1.1905
# if (
# i == 2
# ): # to the right this is 4th observed burst, to left is predicted burst
# cfac1 = 1.00
# cfac2 = 1.2346
# if (
# i == 3
# ): # to the right this is final predicted burst, to the left is first observed burst (note that cfac = 1.25 is estimated interpolation)
# cfac1 = 1.00
# cfac2 = 1.25
# if i == 4: # to the right this is final predicted burst, to the left is first observed burst (note that cfac = 1.25 is estimated interpolation)
# cfac1 = 0.98
# cfac2 = 1.27
if backward:
# Find the time for the *previous* burst in the train
result_b = next_burst( bean, base, x_0, z, earliest,
dist, xi_p, cfac, mass, radius, direction=-1, debug=debug)
if result_b is not None:
# we have a result from the next_burst call going backward, so add its properties to the arrays
stime.insert(0, result_b.t2[0])
iref += 1
salpha.insert(0, result_b.alpha[0])
se_b.insert(0, result_b.e_b[0])
smdot.insert(0, result_b.mdot[0])
earliest = result_b.t2[0]
else:
# if the earlier burst has failed, we don't need to pursue any further
if debug:
print("{}: abandoning backward search, step {}".format(fn, i))
backward = False
if forward:
# Also find the time for the *next* burst in the train
result_f = next_burst( bean, base, x_0, z, latest,
dist, xi_p, cfac, mass, radius, direction=1, debug=debug)
if result_f is not None:
# we have a result from the next_burst call going forward, so add its properties to the arrays
stime.append(result_f.t2[0])
salpha.append(result_f.alpha[0])
se_b.append(result_f.e_b[0])
smdot.append(result_f.mdot[0])
latest = result_f.t2[0]
else:
if debug:
print("{}: abandoning backward search, step {}".format(fn, i))
forward = False
# Check the results here
# I don't think t2 or t3 are ever set to these "dummy" values anymore
# if abs(t2) == 99.99 or abs(t3) == 99.99:
if not (forward or backward):
break
if (mdot_max == -1) & (len(stime) > 1):
# only run this if len(stime) > 1, because it is initialised as a
# list with a single value (sbt)
mdot_max = max(smdot)
result = dict()
if full_model:
# model parameters are redundant for the model returned
result["base"] = [base]
result["z"] = [z]
result["x_0"] = [x_0]
result["dist"] = [dist]
result["xi_p"] = [xi_p]
result["mdot_max"] = [mdot_max]
result["mass"] = [mass]
result["radius"] = [radius]
result["forward"] = forward # to keep track of the outcome of each direction
result["backward"] = backward
# now the actual predictions
result["time"] = np.array(stime)
if len(stime) > 0:
# The simulation might fail to generate any bursts, so only add the arrays if they exist
result["mdot"] = np.array(smdot)
result["iref"] = iref
result["alpha"] = np.array(salpha)
result["e_b"] = np.array(se_b)
#print(f"In burstrain fluence is {se_b}")
if debug:
print ("{}: train complete, result={}".format(fn, result))
return result
[docs]def burstensemble(bean, base, x_0, z, dist, xi_p, mass, radius, full_model=False ):
"""
This routine generates as many burst predictions as there are burst
measurements.
Written initially by Luke Waterson, 2021
:param bean: Beans object, from which the remaining parameters are drawn:
bstart, pflux, numburstsobs
:param base: base flux [MeV/nucleon]
:param z: accreted CNO metallicity
:param x_0: accreted H-fraction
:param dist: source distance (kpc)
:param xi_p: anisotropy of persistent emission
:param mass: NS mass (M_sun)
:param radius: NS radius (km)
:return: a dictionary with the following keys: ``['base', 'z', 'x_0', 'dist', 'xi_p', 'time', 'mdot_max', 'mdot', 'iref', 'alpha', 'e_b', 'mass', 'radius']``
The main outputs (as for :meth:`burstrain.generate_burst_train`) are
the ``time``, ``e_b``, ``alpha``, and ``mdot`` elements (numpy
arrays) which are the predicted properties of the bursts and the
accretion rate inferred for each interval.
"""
salpha = []
stime = []
smdot = []
se_b = []
mdot = bean.flux_to_mdot(x_0, dist, xi_p, mass, radius, bean.pflux)
if bean.model_name == 'grid_interp':
if np.any((mdot > bean.grid_mdot_max) |
(mdot < bean.grid_mdot_min)):
return None
for i in range(0, bean.numburstsobs):
tmp = bean.model(base, z, x_0, mdot[i], mass, radius,
corr=bean.corr, interpolator=bean.gi)
# accumulate the predictions into the arrays here
se_b.append(tmp.E_b[0])
salpha.append(tmp.alpha[0])
smdot.append(mdot[i])
# stime.append(bstart[i])
stime.append(tmp.tdel[0])
mdot_max = max(smdot)
result = dict()
if full_model:
# model parameters are redundant for the model returned
result["base"] = [base]
result["z"] = [z]
result["x_0"] = [x_0]
result["dist"] = [dist]
result["xi_p"] = [xi_p]
result["mdot_max"] = [mdot_max]
result["mass"] = [mass]
result["radius"] = [radius]
# now the actual predictions
result["time"] = np.array(stime)
result["mdot"] = np.array(smdot)
result["alpha"] = np.array(salpha)
result["e_b"] = np.array(se_b)
# omit the printing for now, as it prevents assessing the progress
# print('ensemble')
# print(f"In burstrain fluence is {se_b}")
return result