TensorFlow Cluster MNIST 예제
분산병렬 코드 수정하기
NCP TensorFlow Cluster에서 Job Submit(제출)을 하기 위한 코드 수정 방법을 설명합니다.
다음 예제는 /home/ncp/workspace/DistributedTensorFlow.py로, TensorFlow master 노드에 기본 제공됩니다.
수정하거나 추가해야 하는 코드 블록은 아래 두 가지입니다.
- flag에 저장된 Cluster 정보를 파싱하여 ClusterSpec에 패싱하는 블록
- main() 함수를 호출할 때 argparse로 Cluster에서 넘겨주는 인자를 받는 블록
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import sys
import tempfile
import time
import argparse
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
flags = tf.app.flags
flags.DEFINE_string("data_dir", "/home/ncp/mnist-data",
"Directory for storing mnist data")
flags.DEFINE_boolean("download_only", False,
"Only perform downloading of data; Do not proceed to "
"session preparation, model definition or training")
flags.DEFINE_integer("num_gpus", 0, "Total number of gpus for each machine."
"If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
"Number of replicas to aggregate before parameter update"
"is applied (For sync_replicas mode only; default: "
"num_workers)")
flags.DEFINE_integer("hidden_units", 1000,
"Number of units in the hidden layer of the NN")
flags.DEFINE_integer("train_steps", 3000,
"Number of (global) training steps to perform")
flags.DEFINE_integer("batch_size", 100, "Training batch size")
flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
flags.DEFINE_boolean(
"sync_replicas", True,
"Use the sync_replicas (synchronized replicas) mode, "
"wherein the parameter updates from workers are aggregated "
"before applied to avoid stale gradients")
flags.DEFINE_boolean(
"existing_servers", False, "Whether servers already exists. If True, "
"will use the worker hosts via their GRPC URLs (one client process "
"per worker host). Otherwise, will create an in-process TensorFlow "
"server.")
FLAGS = flags.FLAGS
NCP_FLAGS = None
IMAGE_PIXELS = 28
def main(unused_argv):
if NCP_FLAGS.job_name == "worker":
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
if FLAGS.download_only:
sys.exit(0)
다음은 사용자 코드에 별도의 추가가 필요한 코드 블록입니다. Cluster에서 사용자 코드에 넘겨주는 Cluster 스펙 인자 값을 파싱해서 ClusterSpec에 넘겨줍니다. TF_CONFIG 시스템 변수 활용법은 아래에서 별도로 설명합니다.
if NCP_FLAGS.job_name is None or NCP_FLAGS.job_name == "":
raise ValueError("Must specify an explicit `job_name`")
if NCP_FLAGS.task_index is None or NCP_FLAGS.task_index == "":
raise ValueError("Must specify an explicit `task_index`")
# Construct the cluster and start the server
ps_spec = NCP_FLAGS.ps_hosts.split(",")
worker_spec = NCP_FLAGS.worker_hosts.split(",")
# Get the number of workers.
num_workers = len(worker_spec)
cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})
아래 코드 블록은 별도의 수정이 필요 없습니다.
if not FLAGS.existing_servers:
# Not using existing servers. Create an in-process server.
server = tf.train.Server(cluster, job_name=NCP_FLAGS.job_name, task_index=NCP_FLAGS.task_index)
if NCP_FLAGS.job_name == "ps":
server.join()
is_chief = (NCP_FLAGS.task_index == 0)
if FLAGS.num_gpus > 0:
# Avoid gpu allocation conflict: now allocate task_num -> #gpu
# for each worker in the corresponding machine
gpu = (NCP_FLAGS.task_index % FLAGS.num_gpus)
worker_device = "/job:worker/task:%d/gpu:%d" % (NCP_FLAGS.task_index, gpu)
elif FLAGS.num_gpus == 0:
# Just allocate the CPU to worker server
cpu = 0
worker_device = "/job:worker/task:%d/cpu:%d" % (NCP_FLAGS.task_index, cpu)
# The device setter will automatically place Variables ops on separate
# parameter servers (ps). The non-Variable ops will be placed on the workers.
# The ps use CPU and workers use corresponding GPU
with tf.device(
tf.train.replica_device_setter(
worker_device=worker_device,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
global_step = tf.Variable(0, name="global_step", trainable=False)
# Variables of the hidden layer
hid_w = tf.Variable(
tf.truncated_normal(
[IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
stddev=1.0 / IMAGE_PIXELS),
name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# Variables of the softmax layer
sm_w = tf.Variable(
tf.truncated_normal(
[FLAGS.hidden_units, 10],
stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
# Ops: located on the worker specified with NCP_FLAGS.task_index
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
if FLAGS.sync_replicas:
if FLAGS.replicas_to_aggregate is None:
replicas_to_aggregate = num_workers
else:
replicas_to_aggregate = FLAGS.replicas_to_aggregate
opt = tf.train.SyncReplicasOptimizer(
opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
name="mnist_sync_replicas")
train_step = opt.minimize(cross_entropy, global_step=global_step)
if FLAGS.sync_replicas:
local_init_op = opt.local_step_init_op
if is_chief:
local_init_op = opt.chief_init_op
ready_for_local_init_op = opt.ready_for_local_init_op
# Initial token and chief queue runners required by the sync_replicas mode
chief_queue_runner = opt.get_chief_queue_runner()
sync_init_op = opt.get_init_tokens_op()
init_op = tf.global_variables_initializer()
train_dir = tempfile.mkdtemp()
if FLAGS.sync_replicas:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
else:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps",
"/job:worker/task:%d" % NCP_FLAGS.task_index])
# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
print("Worker %d: Initializing session..." % NCP_FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." %
NCP_FLAGS.task_index)
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[NCP_FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Session initialization complete." % NCP_FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])
# Perform training
time_begin = time.time()
print("Training begins @ %f" % time_begin)
local_step = 0
while True:
# Training feed
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
_, step = sess.run([train_step, global_step], feed_dict=train_feed)
local_step += 1
now = time.time()
print("%f: Worker %d: training step %d done (global step: %d)" %
(now, NCP_FLAGS.task_index, local_step, step))
if step >= FLAGS.train_steps:
break
time_end = time.time()
print("Training ends @ %f" % time_end)
training_time = time_end - time_begin
print("Training elapsed time: %f s" % training_time)
# Validation feed
val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
val_xent = sess.run(cross_entropy, feed_dict=val_feed)
print("After %d training step(s), validation cross entropy = %g" %
(FLAGS.train_steps, val_xent))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
다음은 사용자 코드에 별도의 추가가 필요한 코드 블록입니다. Cluster에서 넘겨주는 Cluster의 구성 정보를 argparse로 자동으로 받습니다.
- --worker_hosts와 --ps_hosts는 모든 노드 서버에 동일한 값이 전달됩니다.
- --job_name과 --task_index는 시스템에서 자동으로 해당 노드에 맞는 정보를 제공합니다(단, 'tcm submit' 실행 시 ps_num 값을 지정하여 파라미터 서버의 수를 증가시킬 수 있으며, 이 경우 적용된 파라미터가 전달됩니다).
- --worker_hosts, --ps_hosts, --job_name, --task_index는 미리 정의된 예약 인자 값이므로 그대로 사용해야 합니다.
- unparsed라는 인자 값을 통해서 사용자 정의 인자 값을 받을 수 있습니다.
## get cluster info by NCP
parser.add_argument("--worker_hosts", type=str)
parser.add_argument("--ps_hosts", type=str)
parser.add_argument("--job_name", type=str)
parser.add_argument("--task_index", type=int)
- unparsed 인자 값을 사용하려면 parser.parse_args() 대신에 parser.parse_knwon_args()를 사용합니다.
NCP_FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
TF_CONFIG 시스템 변수의 활용
Cluster Spec 정보를 RunConfig로 파싱하는 경우에는 split(",")를 이용하여 파싱하는 대신 아래의 코드를 활용하세요.
tf_config_dic = {"cluster": {"ps": ps_str.split(","),
"worker": worker_str.split(","),
# "master": master_str.split(",")
},
"task": {"type": task_type,
"index": task_index}}
os.environ["TF_CONFIG"] = json.dumps(tf_config_dic)
- 마스터 노드는 Special Work Node로서 보통 모델의 Checksum과 로그를 관리하는 용도로 사용됩니다.
- 필요한 경우 워커 서버리스트 중 한대를 지정하여 포트 번호를 다른 번호로 변경 후 사용하세요(이 경우 마스터 노드의 task_index는 통상 0입니다).
- 수동으로 TF_CONFIG를 읽는 경우 os.environ.get()을 사용하세요.
외부 모듈 임포트 방법
'tcm submit'은 단일 학습 프로그램만 각 노드에 배포하도록 구성되어 있습니다. 외부 모듈을 임포트해야 하는 경우 공용 NAS 스토리지 /mnt/nas에 코드를 복사한 뒤 아래와 같이 사용하세요.
import sys
sys.path.insert(0, [모듈의 PATH])
import [모듈명]
tcm submit을 사용하지 않고 수동으로 실행하는 방법
사용자 패키지의 구조상 tcm submit
명령어를 사용하기 어려운 경우 Cluster master 콘솔에서 다음과 같이 명령을 실행할 수 있습니다.
- 서버 노드 접근 포트는 기본적으로 22번입니다.
program_path
는 사용자 코드의 경로입니다. 모든 서버 노드에서 접근이 가능해야 하기 때문에 /mnt/nas 하위에 위치시켜 주세요.arg_string
은 사용자 프로그램에 넘겨야 할 커스텀 인자 값이 있는 경우의 예입니다./home/ncp/ncp.log
는 로그 경로의 예입니다. /mnt/nas 하위에 log 폴더를 만들어서 사용하십시오.
_cmd = "ssh -p " + port + " root@" + target_ip + \
" " + "/home/ncp/anaconda2/envs/ncp/bin/python " \
+ program_path + " " + arg_string + \
" ' >> /mnt/nas/log/ncp.log 2>&1 &"
os.system(_cmd)