mirror of
https://github.com/autistic-symposium/ml-ai-agents-py.git
synced 2025-04-25 10:09:09 -04:00
130 lines
3.5 KiB
Python
130 lines
3.5 KiB
Python
import tensorflow as tf
|
|
import numpy as np
|
|
|
|
from tensorflow.python.platform import flags
|
|
flags.DEFINE_bool('proposal_debug', False, 'Print hmc acceptance raes')
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
def kinetic_energy(velocity):
|
|
"""Kinetic energy of the current velocity (assuming a standard Gaussian)
|
|
(x dot x) / 2
|
|
|
|
Parameters
|
|
----------
|
|
velocity : tf.Variable
|
|
Vector of current velocity
|
|
|
|
Returns
|
|
-------
|
|
kinetic_energy : float
|
|
"""
|
|
return 0.5 * tf.square(velocity)
|
|
|
|
def hamiltonian(position, velocity, energy_function):
|
|
"""Computes the Hamiltonian of the current position, velocity pair
|
|
|
|
H = U(x) + K(v)
|
|
|
|
U is the potential energy and is = -log_posterior(x)
|
|
|
|
Parameters
|
|
----------
|
|
position : tf.Variable
|
|
Position or state vector x (sample from the target distribution)
|
|
velocity : tf.Variable
|
|
Auxiliary velocity variable
|
|
energy_function
|
|
Function from state to position to 'energy'
|
|
= -log_posterior
|
|
|
|
Returns
|
|
-------
|
|
hamitonian : float
|
|
"""
|
|
batch_size = tf.shape(velocity)[0]
|
|
kinetic_energy_flat = tf.reshape(kinetic_energy(velocity), (batch_size, -1))
|
|
return tf.squeeze(energy_function(position)) + tf.reduce_sum(kinetic_energy_flat, axis=[1])
|
|
|
|
def leapfrog_step(x0,
|
|
v0,
|
|
neg_log_posterior,
|
|
step_size,
|
|
num_steps):
|
|
|
|
# Start by updating the velocity a half-step
|
|
v = v0 - 0.5 * step_size * tf.gradients(neg_log_posterior(x0), x0)[0]
|
|
|
|
# Initalize x to be the first step
|
|
x = x0 + step_size * v
|
|
|
|
for i in range(num_steps):
|
|
# Compute gradient of the log-posterior with respect to x
|
|
gradient = tf.gradients(neg_log_posterior(x), x)[0]
|
|
|
|
# Update velocity
|
|
v = v - step_size * gradient
|
|
|
|
# x_clip = tf.clip_by_value(x, 0.0, 1.0)
|
|
# x = x_clip
|
|
# v_mask = 1 - 2 * tf.abs(tf.sign(x - x_clip))
|
|
# v = v * v_mask
|
|
|
|
# Update x
|
|
x = x + step_size * v
|
|
|
|
# x = tf.clip_by_value(x, -0.01, 1.01)
|
|
|
|
# x = tf.Print(x, [tf.reduce_min(x), tf.reduce_max(x), tf.reduce_mean(x)])
|
|
|
|
# Do a final update of the velocity for a half step
|
|
v = v - 0.5 * step_size * tf.gradients(neg_log_posterior(x), x)[0]
|
|
|
|
# return new proposal state
|
|
return x, v
|
|
|
|
def hmc(initial_x,
|
|
step_size,
|
|
num_steps,
|
|
neg_log_posterior):
|
|
"""Summary
|
|
|
|
Parameters
|
|
----------
|
|
initial_x : tf.Variable
|
|
Initial sample x ~ p
|
|
step_size : float
|
|
Step-size in Hamiltonian simulation
|
|
num_steps : int
|
|
Number of steps to take in Hamiltonian simulation
|
|
neg_log_posterior : str
|
|
Negative log posterior (unnormalized) for the target distribution
|
|
|
|
Returns
|
|
-------
|
|
sample :
|
|
Sample ~ target distribution
|
|
"""
|
|
|
|
v0 = tf.random_normal(tf.shape(initial_x))
|
|
x, v = leapfrog_step(initial_x,
|
|
v0,
|
|
step_size=step_size,
|
|
num_steps=num_steps,
|
|
neg_log_posterior=neg_log_posterior)
|
|
|
|
orig = hamiltonian(initial_x, v0, neg_log_posterior)
|
|
current = hamiltonian(x, v, neg_log_posterior)
|
|
|
|
prob_accept = tf.exp(orig - current)
|
|
|
|
if FLAGS.proposal_debug:
|
|
prob_accept = tf.Print(prob_accept, [tf.reduce_mean(tf.clip_by_value(prob_accept, 0, 1))])
|
|
|
|
uniform = tf.random_uniform(tf.shape(prob_accept))
|
|
keep_mask = (prob_accept > uniform)
|
|
# print(keep_mask.get_shape())
|
|
|
|
x_new = tf.where(keep_mask, x, initial_x)
|
|
return x_new
|