Source code for asterogap.GP

import numpy as np
import george
import emcee
import scipy.stats
import pandas as pd
import os


[docs]class GPFit: def __init__(self, time_stamps, flux, flux_error, kernel_long): self.time = time_stamps self.flux = flux self.flux_err = flux_error self.kernel_long = kernel_long self.data_pts = len(time_stamps) self.params = None self.walker_params = None self.gp = None self.sampler = None self.lsp_period = None self.kde = log_period_prior_kde()
[docs] def set_params(self): """Calculates initial gp parameter values based on data.""" mean_flux = np.mean(self.flux) # kernel1: long-term phase change log_amp_k1 = np.log(self.flux.max() - self.flux.min()) log_metric = np.log(5 ** 2) # kernel2: short-term periodic log_amp_k2 = np.log(self.flux.max() - self.flux.min()) gamma = 10 # ~10 normal gamma log_period = 0 # 24 hours if self.kernel_long: parameters = { "mean": mean_flux, "log_amp_k1": log_amp_k1, "log_metric": log_metric, "log_amp_k2": log_amp_k2, "gamma": gamma, "log_period": log_period, } else: parameters = { "mean": mean_flux, "log_amp_k2": log_amp_k2, "gamma": gamma, "log_period": log_period, } self.params = parameters return
[docs] def set_walker_param_matrix(self, nwalkers, seed=None): """Creates a matrix of starting parameters for every walker.""" if self.params is not None: # create an array of all parameter values p_start = np.array(list(self.params.values())) #cov_matrix = np.sqrt(np.diag(p_start) ** 2) p0 = scipy.stats.multivariate_normal.rvs( p_start, size=nwalkers, random_state=seed ) # p0 = np.random.multivariate_normal( # mean=p_start, cov=cov_matrix, size=(nwalkers) # ) # randomly distributed starting period values from ~1hr to ~24hrs #np.random.seed(seed) #p0[:, -1] = np.random.normal(size=nwalkers) * 0.5 + np.log(4 / 24.0) #calculate kde for period #kde = log_period_prior_kde() #x_grid = np.linspace(-8, 5, 1000) #min should be 30 minutes #pdf = kde.evaluate(x_grid) p0_samples = self.kde.resample(nwalkers) print(p0_samples) p0[:, -1] = p0_samples # print(p0[:, -2]) self.walker_params = p0 else: print("Please set parameter values first.") return
[docs] def set_gp_kernel(self): """Sets up the Gaussian Process Kernel that is needed for george.""" if self.kernel_long: k1 = np.exp(self.params["log_amp_k1"]) * george.kernels.ExpSquaredKernel( metric=np.exp(self.params["log_metric"]) ) k2 = np.exp(self.params["log_amp_k2"]) * george.kernels.ExpSine2Kernel( gamma=(self.params["gamma"]), log_period=self.params["log_period"] ) if self.kernel_long: kernel = k1 * k2 else: kernel = k2 gp = george.GP(kernel, fit_mean=True, mean=self.params["mean"]) gp.compute(self.time, self.flux_err) self.gp = gp return
[docs] def run_emcee(self, nwalkers, niter, threads, burn_in): """Runs emcee's mcmc code.""" ndim = self.gp.vector_size #ndim is 4 if not including longterm kernel sampler = emcee.EnsembleSampler( nwalkers, ndim, post_lnlikelihood, args=[self.gp, self.time, self.flux, self.flux_err, self.kde], threads=threads, ) # run steps for a burn-in state = sampler.run_mcmc(self.walker_params, burn_in) print(state.coords) sampler.reset() sampler.run_mcmc(state.coords, niter) self.sampler = sampler return sampler
[docs]def prior(params, kde): """ Calculated the log of the prior values, given parameter values. Parameters ---------- params : list List of all kernel parameters param[0] : float mean (between 0 and 2) param[1] : float k1 log constant param[2] : float k1 log metric param[3] : float k2 log amplitude (between -10 and 10) param[4] : float log gamma (log gamma between ####0.1 and 40) param[5] : float log period (period between ####1h and 24hrs) Returns ------- sum_log_prior : int sum of all log priors (-inf if a parameter is out of range) """ # TODO: Improve documentation for prior ranges if len(params) == 6: p_mean = scipy.stats.norm(1, 0.5).logpdf(params[0]) #p_log_amp_k1 = scipy.stats.norm(np.log(2), np.log(10)).logpdf(params[1]) p_log_amp_k1 = scipy.stats.uniform(-10, 20).logpdf(params[1]) p_log_metric = scipy.stats.norm(np.log(100), np.log(10)).logpdf((params[2])) p_log_amp_k2 = scipy.stats.norm(np.log(2), np.log(2)).logpdf(params[3]) p_log_gamma = scipy.stats.norm(np.log(10), np.log(2)).logpdf(np.log(params[4])) #kde = log_period_prior_kde() #if period is smaller than 1 minute or larger than a year, then -np inf if params[5] < -7.27: p_log_period = -np.inf # if params[5] < -700: # print("v smol %d" %params[5]) # p_log_period = -np.inf elif params[5] > 5.9: p_log_period = -np.inf else: p_log_period = kde.evaluate(params[5]) #scipy.stats.norm(np.log(4.0 / 24.0), (12.0 / 24.0)).logpdf(params[5]) sum_log_prior = ( p_mean + p_log_amp_k1 + p_log_metric + p_log_amp_k2 + p_log_gamma + p_log_period ) else: p_mean = scipy.stats.norm(1, 0.5).logpdf(params[0]) p_log_amp_k2 = scipy.stats.norm(np.log(2), np.log(2)).logpdf(params[1]) p_log_gamma = scipy.stats.norm(np.log(10), np.log(2)).logpdf(np.log(params[2])) #kde = log_period_prior_kde() #if period is smaller than 1 minute, then -np inf if params[5] < -7.27: p_log_period = -np.inf if params[5] > 5.9: p_log_period = -np.inf else: p_log_period = kde.evaluate(params[5]) #scipy.stats.norm(np.log(4.0 / 24.0), (12.0 / 24.0)).logpdf(params[5]) sum_log_prior = p_mean + p_log_amp_k2 + p_log_gamma + p_log_period if np.isnan(sum_log_prior) == True: return -np.inf return sum_log_prior
def logl(params, gp, tsample, fsample, flux_err): """ Compute log likelihood based on given parameters. """ gp.set_parameter_vector(params) try: gp.compute(tsample, flux_err) lnlike = gp.lnlikelihood(fsample) except np.linalg.LinAlgError: lnlike = -1e25 return lnlike
[docs]def post_lnlikelihood(params, gp, tsample, fsample, flux_err, kde): """ Calculates the posterior likelihood from the log prior and log likelihood. Parameters ---------- params : list List of all kernel parameters Returns ------- ln_likelihood : float The posterior, unless the posterior is infinite, in which case, -1e25 will be returned instead. """ # calculate the log_prior log_prior = prior(params, kde) # return -inf if parameters are outside the priors if np.isneginf(log_prior) == True: return -np.inf try: lnlike = logl(params, gp, tsample, fsample, flux_err) ln_likelihood = lnlike + log_prior except np.linalg.linalg.LinAlgError: ln_likelihood = -1e25 return ln_likelihood if np.isfinite(ln_likelihood) else -1e25
def read_lcdb(filename='LC_SUM_PUB.TXT'): """Read LCDB file tabulating periods and associated information. LCDB available from: http://www.minorplanet.info/lightcurvedatabase.html Please see that page for citation information. Warner, B.D., Harris, A.W., Pravec, P. (2009). Icarus 202, 134-146. Updated <Date of last update>. http://www.MinorPlanet.info/lightcurvedatabase.html The Asteroid Lightcurve Database is a listing of asteroid lightcurve parameters and other information, e.g., estimated/measured diameters, absolute magnitudes (H), phase slope parameters (G), albedos, + more. Parameters ---------- filename : str, opt The full path to the LCDB file (LC_SUM_PUB.TXT or similarly formatted files) Returns ------- pandas.DataFrame """ names = ['numberId', 'new', 'Name', 'Desig', 'family', 'Csource', 'class', 'Dsource', 'Dflag', 'diameter','Hsource', 'H', 'Hband', 'Asource', 'Albedoflag', 'albedo', 'Pflag', 'period', 'Pdescr','Ampflag', 'AmpMin', 'AmpMax', 'U', 'notes', 'binary', 'priv' , 'pole', 'survA'] colspecs = [(0, 7), (8, 9), (10, 40), (41, 61), (62, 70), (71, 72), (73, 78), (79, 80), (81, 82), (83, 92), (93, 94), (95, 100), (101, 102), (104, 105), (106, 107), (108, 114), (115, 116), (117, 130), (131, 146), (147, 148), (149, 153), (154, 158), (159, 161),(162, 167), (168, 171), (172, 175), (176, 179), (180, 182), (183, 184)] lcdata = pd.read_fwf(filename, index_col=False, skiprows=5, names=names, colspecs=colspecs) objId = lcdata.query('numberId == numberId')['numberId'].astype('int').astype('str') objId2 = lcdata.query('numberId != numberId and Desig == Desig')['Desig'] objId3 = lcdata.query('numberId != numberId and Desig != Desig')['Name'] t = pd.concat([objId, objId2, objId3]) lcdata = lcdata.merge(t.to_frame(name='objId'), left_index=True, right_index=True) lcdata['Frequency'] = 1.0/lcdata.period # Map flags to ints. tmp = lcdata.new.values tmp = np.where(tmp == '*', 1, 0) lcdata.new = tmp #tmp = lcdata.sparse.values #tmp = np.where(tmp == 'Y', 1, 0) #lcdata.sparse = tmp #tmp = lcdata.widefield.valuesCometGPs/data/LCLIST_PUB_2019JAN/LC_SUM_PUB.TXT")# #tmp = np.where(tmp == 'Y', 1, 0) #lcdata.widefield = tmp return lcdata def log_period_prior_kde(lcdb_file = "LCLIST_PUB_CURRENT/LC_SUM_PUB.TXT"): """ Calculates the log period prior distribution based on the current asteroid rotational periods documented. """ from os.path import expanduser userhome = expanduser("~") file_loc = userhome + "/asterogap/data/" if not os.path.isfile(file_loc + lcdb_file): # if this command doesn't work, you can always got to # http://www.minorplanet.info/lightcurvedatabase.html # download and unzip the latest public release file # and then place that file into the data folder of this package print("%s is not found." %lcdb_file) print("Downloading the lastest public release from http://www.minorplanet.info/lightcurvedatabase.html.") import requests import zipfile zip_url = "http://www.minorplanet.info/datazips/LCLIST_PUB_CURRENT.zip" r = requests.get(zip_url) print("test") # send a HTTP request to the server and save # the HTTP response in a response object called r with open("LCLIST_PUB_CURRENT.zip",'wb') as f: # write the contents of the response (r.content) # to a new file in binary mode. # should take a few seconds f.write(r.content) with zipfile.ZipFile("LCLIST_PUB_CURRENT.zip", 'r') as zip_ref: zip_ref.extractall("../data/LCLIST_PUB_CURRENT") # go ahead and delete the zip file if os.path.isfile(lcdb_file): os.remove("LCLIST_PUB_CURRENT.zip") lcdb = read_lcdb(file_loc + lcdb_file) # drop any entries that have NAN for a period value x = np.log(lcdb.period.dropna().values/24.) kde = scipy.stats.gaussian_kde(x, bw_method=0.2 / x.std(ddof=1)) return kde