Workbench (#303)
* begin workbench * cleanup * begin procgen config integration * arg tweaks * more args * parameter saving * begin procgen enjoy * tweaks * more workbench * more args sync/restore * cleanup * merge in master * rework args priority * more workbench * more loggign * impala cnn * impala lstm * tweak * tweaks * rl19 time logging * misc fixes * faster pipeline * update local.py * sess and log config tweaks * num processes * logging tweaks * difficulty reward wrapper * logging fixes * gin tweaks * tweak * fix * task id * param loading * more variable loading * entrypoint * tweak * ksync * restore lstm * begin rl19 support * tweak * rl19 rnn * more rl19 integration * fix * cleanup * restore rl19 rnn * cleanup * cleanup * wrappers.get_log_info * cleanup * cleanup * directory cleanup * logging, num_experiments * fixes * cleanup * gin fixes * fix local max gpu * resid nx * num machines and download params * rename * cleanup * create workbench * more reorg * fix * more logging wrappers * lint fix * restore train procgen * restore train procgen * pylint fix * better wrapping * config sweep * args sweep * test workers * mpi_weight * train test comm and high difficulty fix * enjoy show returns * removing gin, procgen_parser * removing gin * procgen args * config fixes * cleanup * cleanup * procgen args fix * fix * rcall syncing * lint * rename mpi_weight * use username for sync * fixes * microbatch fix
This commit is contained in:
committed by
Peter Zhokhov
parent
376fd88bb8
commit
5082e5d34b
@@ -9,22 +9,27 @@ except ImportError:
|
||||
|
||||
class MpiAdamOptimizer(tf.train.AdamOptimizer):
|
||||
"""Adam optimizer that averages gradients across mpi processes."""
|
||||
def __init__(self, comm, **kwargs):
|
||||
def __init__(self, comm, mpi_rank_weight=1, **kwargs):
|
||||
self.comm = comm
|
||||
self.mpi_rank_weight = mpi_rank_weight
|
||||
tf.train.AdamOptimizer.__init__(self, **kwargs)
|
||||
def compute_gradients(self, loss, var_list, **kwargs):
|
||||
grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs)
|
||||
grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None]
|
||||
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0)
|
||||
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) * self.mpi_rank_weight
|
||||
shapes = [v.shape.as_list() for g, v in grads_and_vars]
|
||||
sizes = [int(np.prod(s)) for s in shapes]
|
||||
num_tasks = self.comm.Get_size()
|
||||
|
||||
total_weight = np.zeros(1, np.float32)
|
||||
self.comm.Allreduce(np.array([self.mpi_rank_weight], dtype=np.float32), total_weight, op=MPI.SUM)
|
||||
total_weight = total_weight[0]
|
||||
|
||||
buf = np.zeros(sum(sizes), np.float32)
|
||||
countholder = [0] # Counts how many times _collect_grads has been called
|
||||
stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable
|
||||
def _collect_grads(flat_grad, np_stat):
|
||||
self.comm.Allreduce(flat_grad, buf, op=MPI.SUM)
|
||||
np.divide(buf, float(num_tasks), out=buf)
|
||||
np.divide(buf, float(total_weight), out=buf)
|
||||
if countholder[0] % 100 == 0:
|
||||
check_synced(np_stat, self.comm)
|
||||
countholder[0] += 1
|
||||
|
Reference in New Issue
Block a user