Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ before_install:
install:
- pip install .
script:
- travis_wait 30 nosetests
- travis_wait 40 nosetests
deploy:
provider: pypi
distributions: sdist bdist_wheel
Expand Down
20 changes: 11 additions & 9 deletions pydream/Dream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from . import Dream_shared_vars
from datetime import datetime
import traceback
import multiprocessing as mp
from multiprocessing import pool
import multiprocess as mp
from multiprocess import pool
import time

class Dream():
Expand Down Expand Up @@ -70,7 +70,13 @@ def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossov
self.mp_context = mp_context
# Set model and variable attributes (if no variables passed, set to all parameters)
self.model = model
self.model_name = model_name
if not model_name:
# Replace `:` for `_` because colons cause problems in windows paths
prefix = datetime.now().strftime('%Y_%m_%d_%H_%M_%S_')
else:
prefix = model_name + '_'
self.model_name = prefix

if variables is None:
self.variables = self.model.sampled_parameters
else:
Expand Down Expand Up @@ -937,12 +943,8 @@ def record_history(self, nseedchains, ndimensions, q_new, len_history):

Dream_shared_vars.count.value += 1
if self.save_history and len_history == (nhistoryrecs+1)*ndimensions:
if not self.model_name:
prefix = datetime.now().strftime('%Y_%m_%d_%H:%M:%S')+'_'
else:
prefix = self.model_name+'_'

self.save_history_to_disc(np.frombuffer(Dream_shared_vars.history.get_obj()), prefix)
self.save_history_to_disc(np.frombuffer(Dream_shared_vars.history.get_obj()), self.model_name)

def save_history_to_disc(self, history, prefix):
"""Save history and crossover probabilities to files at end of run.
Expand Down Expand Up @@ -1015,7 +1017,7 @@ def daemon(self, val):
pass


from multiprocessing import context
from multiprocess import context


# Exists on all platforms
Expand Down
116 changes: 73 additions & 43 deletions pydream/core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-

import numpy as np
import multiprocessing as mp
import collections
import multiprocess as mp
from . import Dream_shared_vars
from .Dream import Dream, DreamPool
from .model import Model
Expand All @@ -27,6 +28,9 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,
Whether run is a continuation of an earlier run. Pass this with the model_name argument to automatically load previous history and crossover probability files. Default: False
verbose: Boolean, optional
Whether to print verbose output (including acceptance or rejection of moves and the current acceptance rate). Default: True
nverbose: int, optional
Rate at which the acceptance rate is printed if verbose is set to True. Every n-th iteration the acceptance rate
will be printed and added to the acceptance rate file. Default: 10
tempering: Boolean, optional
Whether to use parallel tempering for the DREAM chains. Warning: this feature is untested. Use at your own risk! Default: False
mp_context: multiprocessing context or None.
Expand All @@ -53,15 +57,24 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,
parameters = [parameters]

model = Model(likelihood=likelihood, sampled_parameters=parameters)

if restart:
model_prefix = kwargs['model_name']
Comment thread
alubbock marked this conversation as resolved.
step_instance = Dream(model=model, variables=parameters,
history_file=kwargs['model_name'] + '_DREAM_chain_history.npy',
crossover_file=kwargs['model_name'] + '_DREAM_chain_adapted_crossoverprob.npy',
gamma_file=kwargs['model_name'] + '_DREAM_chain_adapted_gammalevelprob.npy',
history_file=model_prefix + '_DREAM_chain_history.npy',
Comment thread
alubbock marked this conversation as resolved.
crossover_file=model_prefix + '_DREAM_chain_adapted_crossoverprob.npy',
gamma_file=model_prefix + '_DREAM_chain_adapted_gammalevelprob.npy',
verbose=verbose, mp_context=mp_context, **kwargs)

# Reload acceptance rate data
chains_naccepts_iterations = []
for chain in range(nchains):
na = np.load(f'{model_prefix}_naccepts_chain{chain}.npy')
chains_naccepts_iterations.append(na)

else:
step_instance = Dream(model=model, variables=parameters, verbose=verbose, mp_context=mp_context, **kwargs)
chains_naccepts_iterations = [np.zeros((2, 1), dtype=np.int)] * nchains

pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start, mp_context=mp_context)
try:
Expand All @@ -71,15 +84,20 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,

else:

if type(start) is list:
args = zip([step_instance]*nchains, [niterations]*nchains, start, [verbose]*nchains, [nverbose]*nchains)

else:
args = list(zip([step_instance]*nchains, [niterations]*nchains, [start]*nchains, [verbose]*nchains, [nverbose]*nchains))
if not isinstance(start, collections.abc.Iterable):
start = [start] * nchains
args = list(zip([step_instance] * nchains, [niterations] * nchains, start, [verbose] * nchains,
[nverbose]*nchains, list(range(nchains)), chains_naccepts_iterations))

returned_vals = pool.map(_sample_dream, args)
sampled_params = [val[0] for val in returned_vals]
log_ps = [val[1] for val in returned_vals]
acceptance_rates = [val[2] for val in returned_vals]

for chain in range(nchains):
filename = f'{step_instance.model_name}_acceptance_rates_chain{chain}.txt'
with open(filename, 'ab') as f:
np.savetxt(f, acceptance_rates[chain])
finally:
pool.close()
pool.join()
Expand All @@ -88,21 +106,31 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,

def _sample_dream(args):

try:
try:
dream_instance = args[0]
iterations = args[1]
start = args[2]
verbose = args[3]
nverbose = args[4]
chain_idx = args[5]
naccepts_iterations_total = args[6]
step_fxn = getattr(dream_instance, 'astep')
sampled_params = np.empty((iterations, dream_instance.total_var_dimension))
log_ps = np.empty((iterations, 1))
acceptance_rates_size = int(np.floor(iterations / nverbose))
if acceptance_rates_size == 0:
acceptance_rates_size = 1
acceptance_rates = np.zeros(acceptance_rates_size)
q0 = start
naccepts = 0
iterations_total = np.sum(naccepts_iterations_total[1])
naccepts = naccepts_iterations_total[0][-1]
naccepts100win = 0
for iteration in range(iterations):
acceptance_counter = 0
for iteration_idx, iteration in enumerate(range(iterations_total, iterations_total + iterations)):
if iteration%nverbose == 0:
acceptance_rate = float(naccepts)/(iteration+1)
acceptance_rates[acceptance_counter] = acceptance_rate
acceptance_counter += 1
if verbose:
print('Iteration: ',iteration,' acceptance rate: ',acceptance_rate)
if iteration%100 == 0:
Expand All @@ -111,47 +139,50 @@ def _sample_dream(args):
print('Iteration: ',iteration,' acceptance rate over last 100 iterations: ',acceptance_rate_100win)
naccepts100win = 0
old_params = q0
sampled_params[iteration], log_prior , log_like = step_fxn(q0)
log_ps[iteration] = log_like + log_prior
q0 = sampled_params[iteration]
sampled_params[iteration_idx], log_prior, log_like = step_fxn(q0)
log_ps[iteration_idx] = log_like + log_prior
q0 = sampled_params[iteration_idx]
if old_params is None:
old_params = q0

if np.any(q0 != old_params):
naccepts += 1
naccepts100win += 1


naccepts_iterations_total = np.append(naccepts_iterations_total, np.array([[naccepts], [iterations]]), axis=1)
np.save(f'{dream_instance.model_name}_naccepts_chain{chain_idx}.npy', naccepts_iterations_total)

except Exception as e:
traceback.print_exc()
print()
raise e

return sampled_params, log_ps
return sampled_params, log_ps, acceptance_rates

def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose):

T = np.zeros((nchains))
T[0] = 1.
for i in range(nchains):
T[i] = np.power(.001, (float(i)/nchains))
step_instances = [step_instance]*nchains

step_instances = [step_instance]*nchains

if type(start) is list:
args = list(zip(step_instances, start, T, [None]*nchains, [None]*nchains))
else:
args = list(zip(step_instances, [start]*nchains, T, [None]*nchains, [None]*nchains))

sampled_params = np.zeros((nchains, niterations*2, step_instance.total_var_dimension))
log_ps = np.zeros((nchains, niterations*2, 1))

q0 = start
naccepts = np.zeros((nchains))
naccepts100win = np.zeros((nchains))
nacceptsT = np.zeros((nchains))
nacceptsT100win = np.zeros((nchains))
ttestsper100 = 100./nchains

for iteration in range(niterations):
itidx = iteration*2
if iteration%10 == 0:
Expand All @@ -176,8 +207,8 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose):
logprinews = [val[1] for val in returned_vals]
loglikenews = [val[2] for val in returned_vals]
dream_instances = [val[3] for val in returned_vals]
logpnews = [T[i]*loglikenews[i] + logprinews[i] for i in range(nchains)]
logpnews = [T[i]*loglikenews[i] + logprinews[i] for i in range(nchains)]

for chain in range(nchains):
sampled_params[chain][itidx] = qnews[chain]
log_ps[chain][itidx] = logpnews[chain]
Expand All @@ -189,17 +220,16 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose):
T2 = T[random_chains[1]]
logp1 = logpnews[random_chains[0]]
logp2 = logpnews[random_chains[1]]



alpha = ((T1*loglike2)+(T2*loglike1))-((T1*loglike1)+(T2*loglike2))

if np.log(np.random.uniform()) < alpha:
if verbose:
print('Accepted temperature swap of chains: ',random_chains,' at temperatures: ',T1,' and ',T2,' and logps: ',logp1,' and ',logp2)
nacceptsT[random_chains[0]] += 1
nacceptsT[random_chains[1]] += 1
nacceptsT100win[random_chains[0]] += 1
nacceptsT100win[random_chains[1]] += 1
nacceptsT100win[random_chains[1]] += 1
old_qs = list(qnews)
old_logps = list(logpnews)
old_loglikes = list(loglikenews)
Expand All @@ -215,11 +245,11 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose):
else:
if verbose:
print('Did not accept temperature swap of chains: ',random_chains,' at temperatures: ',T1,' and ',T2,' and logps: ',logp1,' and ',logp2)

for chain in range(nchains):
sampled_params[chain][itidx+1] = qnews[chain]
log_ps[chain][itidx+1] = logpnews[chain]

for i, q in enumerate(qnews):
try:
if not np.all(q == q0[i]):
Expand All @@ -228,12 +258,12 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose):
except TypeError:
#On first iteration without starting points this will fail because q0 == None
pass

args = list(zip(dream_instances, qnews, T, loglikenews, logprinews))
q0 = qnews

return sampled_params, log_ps


def _sample_dream_pt_chain(args):

Expand All @@ -244,11 +274,11 @@ def _sample_dream_pt_chain(args):
last_logpri = args[4]
step_fxn = getattr(dream_instance, 'astep')
q1, logprior1, loglike1 = step_fxn(start, T, last_loglike, last_logpri)

return q1, logprior1, loglike1, dream_instance

def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_context=None):

min_njobs = (2*len(step_instance.DEpairs))+1
if nchains < min_njobs:
raise Exception('Dream should be run with at least (2*DEpairs)+1 number of chains. For current algorithmic settings, set njobs>=%s.' %str(min_njobs))
Expand All @@ -266,12 +296,12 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_
arr_dim = ((np.floor(nchains*niterations/step_instance.history_thin)+nchains)*step_instance.total_var_dimension)+(step_instance.nseedchains*step_instance.total_var_dimension)
else:
arr_dim = np.floor(((nchains*niterations/step_instance.history_thin)*step_instance.total_var_dimension))+(step_instance.nseedchains*step_instance.total_var_dimension)

min_nseedchains = 2*len(step_instance.DEpairs)*nchains

if step_instance.nseedchains < min_nseedchains:
raise Exception('The size of the seeded starting history is insufficient. Increase nseedchains>=%s.' %str(min_nseedchains))

current_position_dim = nchains*step_instance.total_var_dimension
# Get context to define arrays
if mp_context is None:
Expand All @@ -295,10 +325,10 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_
shared_nchains = ctx.Value('i', nchains)
n = ctx.Value('i', 0)
tf = ctx.Value('c', b'F')

if step_instance.crossover_burnin == None:
step_instance.crossover_burnin = int(np.floor(niterations/10))

if start_pt != None:
if step_instance.start_random:
print('Warning: start position provided but random_start set to True. Overrode random_start value and starting walk at provided start position.')
Expand Down
8 changes: 5 additions & 3 deletions pydream/tests/test_dream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import unittest
from os import remove

import multiprocessing as mp
import multiprocess as mp
import numpy as np
import pydream.Dream_shared_vars
from pydream.Dream import Dream
Expand Down Expand Up @@ -575,8 +575,10 @@ def test_mp_sampledreamfxn(self):
start = np.array([-7, 8, 1.2, 0])
verbose = False
nverbose = 10
args = [dream, iterations, start, verbose, nverbose]
sampled_params, logps = _sample_dream(args)
chain_idx = 0
chains_naccepts_iterations = np.zeros((2, 1), dtype=np.int)
args = [dream, iterations, start, verbose, nverbose, chain_idx, chains_naccepts_iterations]
sampled_params, logps, acceptance_rates = _sample_dream(args)

self.assertEqual(len(sampled_params), 10)
self.assertEqual(len(sampled_params[0]), 4)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def readme():
author='Erin Shockley',
author_email='erin.shockley@vanderbilt.edu',
packages=['pydream'],
install_requires=['numpy', 'scipy'],
install_requires=['multiprocess', 'numpy', 'scipy'],
zip_safe=False,
test_suite='nose.collector',
tests_require=['nose'],
Expand Down