From 576652e94347abe63f213c8d9df5cb491ac6bf78 Mon Sep 17 00:00:00 2001 From: Zhang-Zhenning Date: Thu, 28 Jul 2022 08:52:36 +0000 Subject: [PATCH] reorganize the file structure --- .../features/serving/DIEN/generate_data.py | 125 ++++++ .../serving/DIEN/prepare_savedmodel.py | 100 +++++ .../serving/DIEN/start_serving_dien.cc | 263 ++++++++++++ .../serving/DeepFM/prepare_savedmodel.py | 266 ++++++++++++ .../serving/DeepFM/start_serving_deepfm.cc | 244 +++++++++++ modelzoo/features/serving/README.md | 62 +++ .../serving/WDL/prepare_savedmodel.py | 393 ++++++++++++++++++ .../features/serving/WDL/start_serving_wdl.cc | 251 +++++++++++ 8 files changed, 1704 insertions(+) create mode 100644 modelzoo/features/serving/DIEN/generate_data.py create mode 100644 modelzoo/features/serving/DIEN/prepare_savedmodel.py create mode 100644 modelzoo/features/serving/DIEN/start_serving_dien.cc create mode 100644 modelzoo/features/serving/DeepFM/prepare_savedmodel.py create mode 100644 modelzoo/features/serving/DeepFM/start_serving_deepfm.cc create mode 100644 modelzoo/features/serving/README.md create mode 100644 modelzoo/features/serving/WDL/prepare_savedmodel.py create mode 100644 modelzoo/features/serving/WDL/start_serving_wdl.cc diff --git a/modelzoo/features/serving/DIEN/generate_data.py b/modelzoo/features/serving/DIEN/generate_data.py new file mode 100644 index 00000000000..e948a3d0cc4 --- /dev/null +++ b/modelzoo/features/serving/DIEN/generate_data.py @@ -0,0 +1,125 @@ +from posixpath import join +import numpy +from numpy.lib.npyio import save +from script.data_iterator import DataIterator +import tensorflow as tf +import time +import random +import sys +from script.utils import * +from tensorflow.python.framework import ops +import os +import json + +EMBEDDING_DIM = 18 +HIDDEN_SIZE = 18 * 2 +ATTENTION_SIZE = 18 * 2 +best_auc = 0.0 +best_case_acc = 0.0 +batch_size=1 +maxlen=100 + +data_location='../data' +test_file = os.path.join(data_location, "local_test_splitByUser") +uid_voc = os.path.join(data_location, "uid_voc.pkl") +mid_voc = os.path.join(data_location, "mid_voc.pkl") +cat_voc = os.path.join(data_location, "cat_voc.pkl") + +def prepare_data(input, target, maxlen=None, return_neg=False): + # x: a list of sentences + lengths_x = [len(s[4]) for s in input] + seqs_mid = [inp[3] for inp in input] + seqs_cat = [inp[4] for inp in input] + noclk_seqs_mid = [inp[5] for inp in input] + noclk_seqs_cat = [inp[6] for inp in input] + + if maxlen is not None: + new_seqs_mid = [] + new_seqs_cat = [] + new_noclk_seqs_mid = [] + new_noclk_seqs_cat = [] + new_lengths_x = [] + for l_x, inp in zip(lengths_x, input): + if l_x > maxlen: + new_seqs_mid.append(inp[3][l_x - maxlen:]) + new_seqs_cat.append(inp[4][l_x - maxlen:]) + new_noclk_seqs_mid.append(inp[5][l_x - maxlen:]) + new_noclk_seqs_cat.append(inp[6][l_x - maxlen:]) + new_lengths_x.append(maxlen) + else: + new_seqs_mid.append(inp[3]) + new_seqs_cat.append(inp[4]) + new_noclk_seqs_mid.append(inp[5]) + new_noclk_seqs_cat.append(inp[6]) + new_lengths_x.append(l_x) + lengths_x = new_lengths_x + seqs_mid = new_seqs_mid + seqs_cat = new_seqs_cat + noclk_seqs_mid = new_noclk_seqs_mid + noclk_seqs_cat = new_noclk_seqs_cat + + if len(lengths_x) < 1: + return None, None, None, None + + n_samples = len(seqs_mid) + maxlen_x = numpy.max(lengths_x) + neg_samples = len(noclk_seqs_mid[0][0]) + + mid_his = numpy.zeros((n_samples, maxlen_x)).astype('int64') + cat_his = numpy.zeros((n_samples, maxlen_x)).astype('int64') + noclk_mid_his = numpy.zeros( + (n_samples, maxlen_x, neg_samples)).astype('int64') + noclk_cat_his = numpy.zeros( + (n_samples, maxlen_x, neg_samples)).astype('int64') + mid_mask = numpy.zeros((n_samples, maxlen_x)).astype('float32') + for idx, [s_x, s_y, no_sx, no_sy] in enumerate( + zip(seqs_mid, seqs_cat, noclk_seqs_mid, noclk_seqs_cat)): + mid_mask[idx, :lengths_x[idx]] = 1. + mid_his[idx, :lengths_x[idx]] = s_x + cat_his[idx, :lengths_x[idx]] = s_y + noclk_mid_his[idx, :lengths_x[idx], :] = no_sx + noclk_cat_his[idx, :lengths_x[idx], :] = no_sy + + uids = numpy.array([inp[0] for inp in input]) + mids = numpy.array([inp[1] for inp in input]) + cats = numpy.array([inp[2] for inp in input]) + + if return_neg: + return uids, mids, cats, mid_his, cat_his, mid_mask, numpy.array( + target), numpy.array(lengths_x), noclk_mid_his, noclk_cat_his + + else: + return uids, mids, cats, mid_his, cat_his, mid_mask, numpy.array( + target), numpy.array(lengths_x) + + +test_data = DataIterator(test_file, + uid_voc, + mid_voc, + cat_voc, + batch_size, + maxlen, + data_location=data_location) + +f = open("./test_data.csv","w") +counter = 0 + +for src, tgt in test_data: + uids, mids, cats, mid_his, cat_his, mid_mask, target, sl = prepare_data(src, tgt) + all_data = [uids, mids, cats, mid_his, cat_his, mid_mask, target, sl] + for cur_data in all_data: + cur_data = numpy.squeeze(cur_data).reshape(-1) + for col in range(cur_data.shape[0]): + uid = cur_data[col] + # print(uid) + if col == cur_data.shape[0]-1: + f.write(str(uid)+",k,") + break + f.write(str(uid)+",") + + f.write("\n"); + if counter >= 1: + break + counter += 1 + +f.close() \ No newline at end of file diff --git a/modelzoo/features/serving/DIEN/prepare_savedmodel.py b/modelzoo/features/serving/DIEN/prepare_savedmodel.py new file mode 100644 index 00000000000..12edb156ea2 --- /dev/null +++ b/modelzoo/features/serving/DIEN/prepare_savedmodel.py @@ -0,0 +1,100 @@ +from posixpath import join +import numpy +from numpy.lib.npyio import save +from script.data_iterator import DataIterator +import tensorflow as tf +from script.model import * +import time +import random +import sys +from script.utils import * +from tensorflow.python.framework import ops +from tensorflow.python.client import timeline +import argparse +import os +import json +import pickle as pkl + +EMBEDDING_DIM = 18 +HIDDEN_SIZE = 18 * 2 +ATTENTION_SIZE = 18 * 2 +best_auc = 0.0 +best_case_acc = 0.0 +batch_size = 128 + +def unicode_to_utf8(d): + return dict((key.encode("UTF-8"), value) for (key, value) in d.items()) + + +def load_dict(filename): + try: + with open(filename, 'rb') as f: + return unicode_to_utf8(json.load(f)) + except: + with open(filename, 'rb') as f: + return pkl.load(f) + + +def main(n_uid,n_mid,n_cat): + + with tf.Session() as sess1: + + model = Model_DIN_V2_Gru_Vec_attGru_Neg(n_uid, n_mid, n_cat, + EMBEDDING_DIM, HIDDEN_SIZE, + ATTENTION_SIZE) + + # Initialize saver + folder_dir = args.checkpoint + saver = tf.train.Saver() + + sess1.run(tf.global_variables_initializer()) + sess1.run(tf.local_variables_initializer()) + # Restore from checkpoint + saver.restore(sess1,tf.train.latest_checkpoint(folder_dir)) + + # Get save directory + dir = "./savedmodels" + os.makedirs(dir,exist_ok=True) + cc_time = int(time.time()) + saved_path = os.path.join(dir,str(cc_time)) + os.mkdir(saved_path) + + + tf.saved_model.simple_save( + sess1, + saved_path, + inputs = {"Inputs/mid_his_batch_ph:0":model.mid_his_batch_ph,"Inputs/cat_his_batch_ph:0":model.cat_his_batch_ph, + "Inputs/uid_batch_ph:0":model.uid_batch_ph,"Inputs/mid_batch_ph:0":model.mid_batch_ph,"Inputs/cat_batch_ph:0":model.cat_batch_ph, + "Inputs/mask:0":model.mask,"Inputs/seq_len_ph:0":model.seq_len_ph,"Inputs/target_ph:0":model.target_ph}, + outputs = {"top_full_connect/add_2:0":model.y_hat} + ) + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument('--checkpoint', + help='ckpt path', + required=False, + default='../data') + parser.add_argument('--bf16', + help='enable DeepRec BF16 in deep model. Default FP32', + action='store_true') + parser.add_argument('--data_location', + help='Full path of train data', + required=False, + default='./data') + args = parser.parse_args() + + uid_voc = os.path.join(args.data_location, "uid_voc.pkl") + mid_voc = os.path.join(args.data_location, "mid_voc.pkl") + cat_voc = os.path.join(args.data_location, "cat_voc.pkl") + + uid_d = load_dict(uid_voc) + mid_d = load_dict(mid_voc) + cat_d = load_dict(cat_voc) + + main(len(uid_d),len(mid_d),len(cat_d)) + + diff --git a/modelzoo/features/serving/DIEN/start_serving_dien.cc b/modelzoo/features/serving/DIEN/start_serving_dien.cc new file mode 100644 index 00000000000..aa29c9e045a --- /dev/null +++ b/modelzoo/features/serving/DIEN/start_serving_dien.cc @@ -0,0 +1,263 @@ +#include +#include +#include "serving/processor/serving/processor.h" +#include "serving/processor/serving/predict.pb.h" +#include +#include + + +static const char* model_config = "{ \ + \"omp_num_threads\": 4, \ + \"kmp_blocktime\": 0, \ + \"feature_store_type\": \"memory\", \ + \"serialize_protocol\": \"protobuf\", \ + \"inter_op_parallelism_threads\": 10, \ + \"intra_op_parallelism_threads\": 10, \ + \"init_timeout_minutes\": 1, \ + \"signature_name\": \"serving_default\", \ + \"read_thread_num\": 3, \ + \"update_thread_num\": 2, \ + \"model_store_type\": \"local\", \ + \"checkpoint_dir\": \"/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/DIEN/result/\", \ + \"savedmodel_dir\": \"/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/DIEN/savedmodels/1658740712/\" \ + } "; + + + +::tensorflow::eas::ArrayProto get_proto_cc(std::vector& cur_vector, ::tensorflow::eas::ArrayDataType dtype_f){ + ::tensorflow::eas::ArrayShape array_shape; + ::tensorflow::eas::ArrayProto input; + + int num_elem = (int)cur_vector.size(); + input.set_dtype(dtype_f); + + switch(dtype_f){ + case 1: + array_shape.add_dim(1); + if (num_elem == 1){ + input.add_float_val((float)atof(cur_vector.back())); + *(input.mutable_array_shape()) = array_shape; + return input; + } + array_shape.add_dim(cur_vector.size()); + for(unsigned int tt = 0; tt < cur_vector.size(); ++tt) + { + input.add_float_val((float)atof(cur_vector[tt])); + } + *(input.mutable_array_shape()) = array_shape; + + return input; + + break; + + case 3: + array_shape.add_dim(1); + if (num_elem == 1){ + input.add_int_val((int)atoi(cur_vector.back())); + *(input.mutable_array_shape()) = array_shape; + return input; + } + array_shape.add_dim(cur_vector.size()); + for(unsigned int tt = 0; tt < cur_vector.size(); ++tt) + { + input.add_int_val((int)atoi(cur_vector[tt])); + } + *(input.mutable_array_shape()) = array_shape; + + return input; + break; + + default: + break; + } + + std::cerr << "type error\n"; + return input; +} + + + + +int main(int argc, char** argv) { + + char filepath[] = "/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/DIEN/test_data.csv"; + + // // ------------------------------------------initialize serving model----------------------------------------- + int state; + void* model = initialize("", model_config, &state); + if (state == -1) { + std::cerr << "initialize error\n"; + } + + // // ---------------------------------------prepare serving data from file-------------------------------------- + + FILE *fp = nullptr; + char *line, *record; + char buffer2[1024]; + char delim[] = ","; + char next_line[] = "k"; + int cur_type = 0; + + // vector variables + std::vector cur_uids; + std::vector cur_mids; + std::vector cur_cats; + std::vector cur_sl; // single + std::vector cur_mid_his; + std::vector cur_cat_his; + std::vector cur_mid_mask; + std::vector cur_target; // multiple + + // temp pointers + std::vector temp_ptrs; + + // start read file + if ( (fp = fopen(filepath,"at+")) != nullptr) { + + // read line by line + while ((line = fgets(buffer2, sizeof(buffer2), fp)) != NULL) { + + // clear all vectors + cur_uids.clear(); + cur_mids.clear(); + cur_cats.clear(); + cur_sl.clear(); + cur_mid_his.clear(); + cur_cat_his.clear(); + cur_mid_mask.clear(); + cur_target.clear(); + + // reinitialize the type variable + cur_type = 0; + + // free memory and clear ptrs + for(int i = 0; i < (int)temp_ptrs.size(); ++i){free(temp_ptrs[i]);} + temp_ptrs.clear(); + + // traverse current line + record = strtok(line, delim); + while (record != nullptr) { + // end of current line + if (cur_type >= 8) {break;} + // next type will start + if (*record == *next_line) {cur_type++; record = strtok(NULL,delim); continue;} + // switch + switch (cur_type) + { + + case 0: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_uids.push_back(temp_ptrs.back()); + break; + + case 1: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_mids.push_back(temp_ptrs.back()); + break; + + case 2: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_cats.push_back(temp_ptrs.back()); + break; + + case 3: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_mid_his.push_back(temp_ptrs.back()); + break; + + case 4: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_cat_his.push_back(temp_ptrs.back()); + break; + + case 5: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_mid_mask.push_back(temp_ptrs.back()); + break; + + case 6: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_target.push_back(temp_ptrs.back()); + break; + + case 7: + + temp_ptrs.push_back((char*) malloc(sizeof(char)*strlen(record))); + strcpy(temp_ptrs.back(),record); + cur_sl.push_back(temp_ptrs.back()); + break; + + default: + break; + } + + record = strtok(NULL,delim); + + } + + + ::tensorflow::eas::ArrayDataType dtype_i = + ::tensorflow::eas::ArrayDataType::DT_INT32; + ::tensorflow::eas::ArrayDataType dtype_f = + ::tensorflow::eas::ArrayDataType::DT_FLOAT; + // get all inputs + ::tensorflow::eas::ArrayProto proto_uids = get_proto_cc(cur_uids,dtype_i); // -1 + ::tensorflow::eas::ArrayProto proto_mids = get_proto_cc(cur_mids,dtype_i); // -1 + ::tensorflow::eas::ArrayProto proto_cats = get_proto_cc(cur_cats,dtype_i); // -1 + ::tensorflow::eas::ArrayProto proto_mid_his = get_proto_cc(cur_mid_his,dtype_i); // -1 -1 + ::tensorflow::eas::ArrayProto proto_cat_his = get_proto_cc(cur_cat_his,dtype_i); // -1 -1 + ::tensorflow::eas::ArrayProto proto_mid_mask= get_proto_cc(cur_mid_mask,dtype_f); //float // -1 -1 + ::tensorflow::eas::ArrayProto proto_target = get_proto_cc(cur_target,dtype_f); //float // -1 -1 + ::tensorflow::eas::ArrayProto proto_sl = get_proto_cc(cur_sl,dtype_i); // -1 + + + // setup request + ::tensorflow::eas::PredictRequest req; + req.set_signature_name("serving_default"); + req.add_output_filter("top_full_connect/add_2:0"); + + (*req.mutable_inputs())["Inputs/uid_batch_ph:0"] = proto_uids; + (*req.mutable_inputs())["Inputs/mid_batch_ph:0"] = proto_mids; + (*req.mutable_inputs())["Inputs/cat_batch_ph:0"] = proto_cats; + (*req.mutable_inputs())["Inputs/mid_his_batch_ph:0"] = proto_mid_his; + (*req.mutable_inputs())["Inputs/cat_his_batch_ph:0"] = proto_cat_his; + (*req.mutable_inputs())["Inputs/mask:0"] = proto_mid_mask; + (*req.mutable_inputs())["Inputs/target_ph:0"] = proto_target; + (*req.mutable_inputs())["Inputs/seq_len_ph:0"] = proto_sl; + + size_t size = req.ByteSizeLong(); + void *buffer1 = malloc(size); + req.SerializeToArray(buffer1, size); + + // ----------------------------------------------process and get feedback--------------------------------------------------- + void* output = nullptr; + int output_size = 0; + state = process(model, buffer1, size, &output, &output_size); + + // parse response + std::string output_string((char*)output, output_size); + ::tensorflow::eas::PredictResponse resp; + resp.ParseFromString(output_string); + std::cout << "process returned state: " << state << ", response: " << resp.DebugString(); + + } + } + + fclose(fp); + return 0; +} + diff --git a/modelzoo/features/serving/DeepFM/prepare_savedmodel.py b/modelzoo/features/serving/DeepFM/prepare_savedmodel.py new file mode 100644 index 00000000000..4166e2c660f --- /dev/null +++ b/modelzoo/features/serving/DeepFM/prepare_savedmodel.py @@ -0,0 +1,266 @@ +import time +import argparse +import tensorflow as tf +import os +import sys +import math +import collections +from tensorflow.python.client import timeline +import json + +CONTINUOUS_COLUMNS = ["I" + str(i) for i in range(1, 14)] # 1-13 inclusive +CATEGORICAL_COLUMNS = ["C" + str(i) for i in range(1, 27)] # 1-26 inclusive +LABEL_COLUMN = ["clicked"] +TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS +FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS +HASH_BUCKET_SIZES = { + 'C1': 2500, + 'C2': 2000, + 'C3': 5000000, + 'C4': 1500000, + 'C5': 1000, + 'C6': 100, + 'C7': 20000, + 'C8': 4000, + 'C9': 20, + 'C10': 100000, + 'C11': 10000, + 'C12': 5000000, + 'C13': 40000, + 'C14': 100, + 'C15': 100, + 'C16': 3000000, + 'C17': 50, + 'C18': 10000, + 'C19': 4000, + 'C20': 20, + 'C21': 4000000, + 'C22': 100, + 'C23': 100, + 'C24': 250000, + 'C25': 400, + 'C26': 100000 +} + + +def add_layer_summary(value, tag): + tf.summary.scalar('%s/fraction_of_zero_values' % tag, + tf.nn.zero_fraction(value)) + tf.summary.histogram('%s/activation' % tag, value) + + +def build_feature_cols(): + wide_column = [] + deep_column = [] + fm_column = [] + for column_name in FEATURE_COLUMNS: + if column_name in CATEGORICAL_COLUMNS: + categorical_column = tf.feature_column.categorical_column_with_embedding( + column_name, + dtype=tf.string) + + categorical_embedding_column = tf.feature_column.embedding_column( + categorical_column, dimension=16, combiner='mean') + + wide_column.append(categorical_embedding_column) + deep_column.append(categorical_embedding_column) + fm_column.append(categorical_embedding_column) + else: + column = tf.feature_column.numeric_column(column_name, shape=(1, )) + wide_column.append(column) + deep_column.append(column) + + return wide_column, fm_column, deep_column + + +class DeepFM(): + def __init__(self, + wide_column=None, + fm_column=None, + deep_column=None, + dnn_hidden_units=[1024, 256, 32], + final_hidden_units=[128, 64], + optimizer_type='adam', + learning_rate=0.001, + inputs=None, + use_bn=True, + bf16=False, + input_layer_partitioner=None, + dense_layer_partitioner=None): + if not inputs: + raise ValueError('Dataset is not defined.') + self.wide_column = wide_column + self.deep_column = deep_column + self.fm_column = fm_column + if not wide_column or not fm_column or not deep_column: + raise ValueError( + 'Wide column, FM column or Deep column is not defined.') + self.dnn_hidden_units = dnn_hidden_units + self.final_hidden_units = final_hidden_units + self.optimizer_type = optimizer_type + self.learning_rate = learning_rate + self.input_layer_partitioner = input_layer_partitioner + self.dense_layer_partitioner = dense_layer_partitioner + + self.feature = inputs + self.bf16 = bf16 + self.use_bn = use_bn + + self.predict = self.prediction() + + + def dnn(self, dnn_input, dnn_hidden_units=None, layer_name=''): + for layer_id, num_hidden_units in enumerate(dnn_hidden_units): + with tf.variable_scope(layer_name + "_%d" % layer_id, + partitioner=self.dense_layer_partitioner, + reuse=tf.AUTO_REUSE) as dnn_layer_scope: + dnn_input = tf.layers.dense(dnn_input, + units=num_hidden_units, + activation=tf.nn.relu, + name=dnn_layer_scope) + if self.use_bn: + dnn_input = tf.layers.batch_normalization( + dnn_input) + add_layer_summary(dnn_input, dnn_layer_scope.name) + + return dnn_input + + + def prediction(self): + # input features + with tf.variable_scope('input_layer', + partitioner=self.input_layer_partitioner, + reuse=tf.AUTO_REUSE): + + fm_cols = {} + wide_input = tf.feature_column.input_layer( + self.feature, self.wide_column, cols_to_output_tensors=fm_cols) + fm_input = tf.stack([fm_cols[cols] for cols in self.fm_column], 1) + dnn_input = tf.feature_column.input_layer(self.feature, + self.deep_column) + + if self.bf16: + wide_input = tf.cast(wide_input, dtype=tf.bfloat16) + fm_input = tf.cast(fm_input, dtype=tf.bfloat16) + dnn_input = tf.cast(dnn_input, dtype=tf.bfloat16) + + # DNN part + if self.bf16: + with tf.variable_scope('dnn').keep_weights(): + dnn_output = self.dnn(dnn_input, self.dnn_hidden_units, + 'dnn_layer') + else: + with tf.variable_scope('dnn'): + dnn_output = self.dnn(dnn_input, self.dnn_hidden_units, + 'dnn_layer') + + # linear / fisrt order part + with tf.variable_scope('linear', reuse=tf.AUTO_REUSE) as linear: + linear_output = tf.reduce_sum(wide_input, axis=1, keepdims=True) + + # FM second order part + with tf.variable_scope('fm', reuse=tf.AUTO_REUSE) as fm: + sum_square = tf.square(tf.reduce_sum(fm_input, axis=1)) + square_sum = tf.reduce_sum(tf.square(fm_input), axis=1) + fm_output = 0.5 * tf.subtract(sum_square, square_sum) + + # Final dnn layer + all_input = tf.concat([dnn_output, linear_output, fm_output], 1) + if self.bf16: + with tf.variable_scope('final_dnn').keep_weights(): + net = self.dnn(all_input, self.final_hidden_units, 'final_dnn') + net = tf.cast(net, dtype=tf.float32) + else: + with tf.variable_scope('final_dnn'): + net = self.dnn(all_input, self.final_hidden_units, 'final_dnn') + + net = tf.layers.dense(net, units=1) + net = tf.math.sigmoid(net) + self.output = net + return net + + +def get_arg_parser(): + parser = argparse.ArgumentParser() + parser.add_argument('--data_location', + help='Full path of train data', + required=False, + default='../data') + parser.add_argument('--batch_size', + help='Batch size to train. Default is 512', + type=int, + default=512) + parser.add_argument('--checkpoint', + help='Full path to checkpoints input/output directory', + required=False) + parser.add_argument('--bf16', + help='enable DeepRec BF16 in deep model. Default FP32', + action='store_true') + parser.add_argument("--optimizer", + type=str, + choices=["adam", "adagrad", "adamasync"], + default="adam") + parser.add_argument('--learning_rate', + help='Learning rate for model', + type=float, + default=0.001) + return parser + + +def main(tf_config=None, server=None): + + with tf.Session() as sess1: + batch_size = args.batch_size + + # set fixed random seed + tf.set_random_seed(2021) + + # create data pipline + wide_column, fm_column, deep_column = build_feature_cols() + + final_input = {} + for i in range(1,14): + final_input["I"+str(i)] = tf.placeholder(tf.float32,[None], name='I'+str(i)) + for j in range(1,27): + final_input["C"+str(j)] = tf.placeholder(tf.string, [None], name='C'+str(j)) + + # create model + model = DeepFM(wide_column=wide_column, + fm_column=fm_column, + deep_column=deep_column, + optimizer_type=args.optimizer, + learning_rate=args.learning_rate, + bf16=args.bf16, + inputs=final_input, + input_layer_partitioner=None, + dense_layer_partitioner=None) + + # Initialize saver + folder_dir = args.checkpoint + saver = tf.train.Saver() + sess1.run(tf.global_variables_initializer()) + sess1.run(tf.local_variables_initializer()) + + # Restore from checkpoint + saver.restore(sess1,tf.train.latest_checkpoint(folder_dir)) + # Get save directory + dir = "./savedmodels" + os.makedirs(dir,exist_ok=True) + cc_time = int(time.time()) + saved_path = os.path.join(dir,str(cc_time)) + os.mkdir(saved_path) + + tf.saved_model.simple_save( + sess1, + saved_path, + inputs = model.feature, + outputs = {"Sigmoid":model.output} + ) + + +if __name__ == "__main__": + parser = get_arg_parser() + args = parser.parse_args() + + main() + \ No newline at end of file diff --git a/modelzoo/features/serving/DeepFM/start_serving_deepfm.cc b/modelzoo/features/serving/DeepFM/start_serving_deepfm.cc new file mode 100644 index 00000000000..8b41071b353 --- /dev/null +++ b/modelzoo/features/serving/DeepFM/start_serving_deepfm.cc @@ -0,0 +1,244 @@ +#include +#include +#include "serving/processor/serving/processor.h" +#include "serving/processor/serving/predict.pb.h" +#include + +static const char* model_config = "{ \ + \"omp_num_threads\": 4, \ + \"kmp_blocktime\": 0, \ + \"feature_store_type\": \"memory\", \ + \"serialize_protocol\": \"protobuf\", \ + \"inter_op_parallelism_threads\": 10, \ + \"intra_op_parallelism_threads\": 10, \ + \"init_timeout_minutes\": 1, \ + \"signature_name\": \"serving_default\", \ + \"read_thread_num\": 3, \ + \"update_thread_num\": 2, \ + \"model_store_type\": \"local\", \ + \"checkpoint_dir\": \"/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/DeepFM/result/\", \ + \"savedmodel_dir\": \"/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/DeepFM/savedmodels/1658806832/\" \ + } "; + +struct input_format39array{ + float I1_13[13]; + char* C1_26[26]; +}; + + +::tensorflow::eas::ArrayProto get_proto_cc(void* char_input, int dim,::tensorflow::eas::ArrayDataType type){ + ::tensorflow::eas::ArrayShape array_shape; + array_shape.add_dim(1); + ::tensorflow::eas::ArrayProto input; + input.set_dtype(type); + + + switch(input.dtype()){ + case 1: + input.add_float_val(*((float*)char_input)); + break; + case 7: + input.add_string_val((char*)char_input); + break; + } + + *(input.mutable_array_shape()) = array_shape; + + return input; +} + + + + +int main(int argc, char** argv) { + + // PLEASE EDIT THIS LINE!!!! + char filepath[] = "/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/WDL/test.csv"; + + // // ------------------------------------------initialize serving model----------------------------------------- + int state; + void* model = initialize("", model_config, &state); + if (state == -1) { + std::cerr << "initialize error\n"; + } + + // // ---------------------------------------prepare serving data from file-------------------------------------- + + FILE *fp = nullptr; + char *line, *record; + char buffer2[1024]; + char delim[] = ","; + char end[] = "\n"; + int j = 0; + int rows = 0; + + + // get row number + if ( (fp = fopen(filepath,"at+")) != nullptr) { + while ((line = fgets(buffer2, sizeof(buffer2), fp)) != nullptr) rows++; + } + + fclose(fp); + + + // get rows + char* all_elems[rows*39]; + int cur_pos = 0; + + if ( (fp = fopen(filepath,"at+")) != nullptr) { + while ((line = fgets(buffer2, sizeof(buffer2), fp)) != NULL) { + record = strtok(line, delim); + while (record != NULL) { + // only 1 label and 39 feature + if (j >= 40) break; + // disragard label + if (j == 0) {j++; record = strtok(NULL,delim); continue;} + char* cur_item = (char*) malloc(sizeof(char)*strlen(record)); + strcpy(cur_item,record); + if (cur_item[strlen(cur_item)-1] == *end) cur_item[strlen(cur_item)-1] = '\0'; + all_elems[cur_pos] = cur_item; + + cur_pos++; + record = strtok(NULL, delim); + j++; + + } + j = 0; + + } + } + + fclose(fp); + + + + // ----------------------------------------------prepare request input---------------------------------------------------- + for(int ii = 0; ii < rows; ii++ ){ + int start_idx = ii * 39; + + struct input_format39array inputs; + for(int jj = 0; jj < 39; jj++){ + if( jj >= 0 && jj <= 12 ) inputs.I1_13[jj] = (float)(atof(all_elems[start_idx + jj])); + else inputs.C1_26[jj-13] = (char*)all_elems[start_idx+jj]; + } + + + // get input type + ::tensorflow::eas::ArrayDataType dtype_f = + ::tensorflow::eas::ArrayDataType::DT_FLOAT; + + ::tensorflow::eas::ArrayDataType dtype_s = + ::tensorflow::eas::ArrayDataType::DT_STRING; + + // input setting + ::tensorflow::eas::ArrayProto I1 = get_proto_cc(&inputs.I1_13[0],1,dtype_f); + ::tensorflow::eas::ArrayProto I2 = get_proto_cc(&inputs.I1_13[1],1,dtype_f); + ::tensorflow::eas::ArrayProto I3 = get_proto_cc(&inputs.I1_13[2],1,dtype_f); + ::tensorflow::eas::ArrayProto I4 = get_proto_cc(&inputs.I1_13[3],1,dtype_f); + ::tensorflow::eas::ArrayProto I5 = get_proto_cc(&inputs.I1_13[4],1,dtype_f); + ::tensorflow::eas::ArrayProto I6 = get_proto_cc(&inputs.I1_13[5],1,dtype_f); + ::tensorflow::eas::ArrayProto I7 = get_proto_cc(&inputs.I1_13[6],1,dtype_f); + ::tensorflow::eas::ArrayProto I8 = get_proto_cc(&inputs.I1_13[7],1,dtype_f); + ::tensorflow::eas::ArrayProto I9 = get_proto_cc(&inputs.I1_13[8],1,dtype_f); + ::tensorflow::eas::ArrayProto I10 = get_proto_cc(&inputs.I1_13[9],1,dtype_f); + ::tensorflow::eas::ArrayProto I11 = get_proto_cc(&inputs.I1_13[10],1,dtype_f); + ::tensorflow::eas::ArrayProto I12 = get_proto_cc(&inputs.I1_13[11],1,dtype_f); + ::tensorflow::eas::ArrayProto I13 = get_proto_cc(&inputs.I1_13[12],1,dtype_f); + ::tensorflow::eas::ArrayProto C1 = get_proto_cc(inputs.C1_26[0],strlen(inputs.C1_26[0]),dtype_s); + ::tensorflow::eas::ArrayProto C2 = get_proto_cc(inputs.C1_26[1],strlen(inputs.C1_26[1]),dtype_s); + ::tensorflow::eas::ArrayProto C3 = get_proto_cc(inputs.C1_26[2],strlen(inputs.C1_26[2]),dtype_s); + ::tensorflow::eas::ArrayProto C4 = get_proto_cc(inputs.C1_26[3],strlen(inputs.C1_26[3]),dtype_s); + ::tensorflow::eas::ArrayProto C5 = get_proto_cc(inputs.C1_26[4],strlen(inputs.C1_26[4]),dtype_s); + ::tensorflow::eas::ArrayProto C6 = get_proto_cc(inputs.C1_26[5],strlen(inputs.C1_26[5]),dtype_s); + ::tensorflow::eas::ArrayProto C7 = get_proto_cc(inputs.C1_26[6],strlen(inputs.C1_26[6]),dtype_s); + ::tensorflow::eas::ArrayProto C8 = get_proto_cc(inputs.C1_26[7],strlen(inputs.C1_26[7]),dtype_s); + ::tensorflow::eas::ArrayProto C9 = get_proto_cc(inputs.C1_26[8],strlen(inputs.C1_26[8]),dtype_s); + ::tensorflow::eas::ArrayProto C10 = get_proto_cc(inputs.C1_26[9],strlen(inputs.C1_26[9]),dtype_s); + ::tensorflow::eas::ArrayProto C11 = get_proto_cc(inputs.C1_26[10],strlen(inputs.C1_26[10]),dtype_s); + ::tensorflow::eas::ArrayProto C12 = get_proto_cc(inputs.C1_26[11],strlen(inputs.C1_26[11]),dtype_s); + ::tensorflow::eas::ArrayProto C13 = get_proto_cc(inputs.C1_26[12],strlen(inputs.C1_26[12]),dtype_s); + ::tensorflow::eas::ArrayProto C14 = get_proto_cc(inputs.C1_26[13],strlen(inputs.C1_26[13]),dtype_s); + ::tensorflow::eas::ArrayProto C15 = get_proto_cc(inputs.C1_26[14],strlen(inputs.C1_26[14]),dtype_s); + ::tensorflow::eas::ArrayProto C16 = get_proto_cc(inputs.C1_26[15],strlen(inputs.C1_26[15]),dtype_s); + ::tensorflow::eas::ArrayProto C17 = get_proto_cc(inputs.C1_26[16],strlen(inputs.C1_26[16]),dtype_s); + ::tensorflow::eas::ArrayProto C18 = get_proto_cc(inputs.C1_26[17],strlen(inputs.C1_26[17]),dtype_s); + ::tensorflow::eas::ArrayProto C19 = get_proto_cc(inputs.C1_26[18],strlen(inputs.C1_26[18]),dtype_s); + ::tensorflow::eas::ArrayProto C20 = get_proto_cc(inputs.C1_26[19],strlen(inputs.C1_26[19]),dtype_s); + ::tensorflow::eas::ArrayProto C21 = get_proto_cc(inputs.C1_26[20],strlen(inputs.C1_26[20]),dtype_s); + ::tensorflow::eas::ArrayProto C22 = get_proto_cc(inputs.C1_26[21],strlen(inputs.C1_26[21]),dtype_s); + ::tensorflow::eas::ArrayProto C23 = get_proto_cc(inputs.C1_26[22],strlen(inputs.C1_26[22]),dtype_s); + ::tensorflow::eas::ArrayProto C24 = get_proto_cc(inputs.C1_26[23],strlen(inputs.C1_26[23]),dtype_s); + ::tensorflow::eas::ArrayProto C25 = get_proto_cc(inputs.C1_26[24],strlen(inputs.C1_26[24]),dtype_s); + ::tensorflow::eas::ArrayProto C26 = get_proto_cc(inputs.C1_26[25],strlen(inputs.C1_26[25]),dtype_s); + + + // PredictRequest + ::tensorflow::eas::PredictRequest req; + req.set_signature_name("serving_default"); + req.add_output_filter("Sigmoid:0"); + + (*req.mutable_inputs())["I1:0"] = I1; + (*req.mutable_inputs())["I2:0"] = I2; + (*req.mutable_inputs())["I3:0"] = I3; + (*req.mutable_inputs())["I4:0"] = I4; + (*req.mutable_inputs())["I5:0"] = I5; + (*req.mutable_inputs())["I6:0"] = I6; + (*req.mutable_inputs())["I7:0"] = I7; + (*req.mutable_inputs())["I8:0"] = I8; + (*req.mutable_inputs())["I9:0"] = I9; + (*req.mutable_inputs())["I10:0"] = I10; + (*req.mutable_inputs())["I11:0"] = I11; + (*req.mutable_inputs())["I12:0"] = I12; + (*req.mutable_inputs())["I13:0"] = I13; + (*req.mutable_inputs())["C1:0"] = C1; + (*req.mutable_inputs())["C2:0"] = C2; + (*req.mutable_inputs())["C3:0"] = C3; + (*req.mutable_inputs())["C4:0"] = C4; + (*req.mutable_inputs())["C5:0"] = C5; + (*req.mutable_inputs())["C6:0"] = C6; + (*req.mutable_inputs())["C7:0"] = C7; + (*req.mutable_inputs())["C8:0"] = C8; + (*req.mutable_inputs())["C9:0"] = C9; + (*req.mutable_inputs())["C10:0"] = C10; + (*req.mutable_inputs())["C11:0"] = C11; + (*req.mutable_inputs())["C12:0"] = C12; + (*req.mutable_inputs())["C13:0"] = C13; + (*req.mutable_inputs())["C14:0"] = C14; + (*req.mutable_inputs())["C15:0"] = C15; + (*req.mutable_inputs())["C16:0"] = C16; + (*req.mutable_inputs())["C17:0"] = C17; + (*req.mutable_inputs())["C18:0"] = C18; + (*req.mutable_inputs())["C19:0"] = C19; + (*req.mutable_inputs())["C20:0"] = C20; + (*req.mutable_inputs())["C21:0"] = C21; + (*req.mutable_inputs())["C22:0"] = C22; + (*req.mutable_inputs())["C23:0"] = C23; + (*req.mutable_inputs())["C24:0"] = C24; + (*req.mutable_inputs())["C25:0"] = C25; + (*req.mutable_inputs())["C26:0"] = C26; + + + size_t size = req.ByteSizeLong(); + void *buffer1 = malloc(size); + req.SerializeToArray(buffer1, size); + + // ----------------------------------------------process and get feedback--------------------------------------------------- + void* output = nullptr; + int output_size = 0; + state = process(model, buffer1, size, &output, &output_size); + + // parse response + std::string output_string((char*)output, output_size); + ::tensorflow::eas::PredictResponse resp; + resp.ParseFromString(output_string); + std::cout << "process returned state: " << state << ", response: " << resp.DebugString(); + } + + //free memory + for(int i=0; i < rows;i++){ + free(all_elems[i]); + } + + return 0; +} + diff --git a/modelzoo/features/serving/README.md b/modelzoo/features/serving/README.md new file mode 100644 index 00000000000..5813d5b6c70 --- /dev/null +++ b/modelzoo/features/serving/README.md @@ -0,0 +1,62 @@ +## How to use prepare_savedmodel.py to get savedmodel + +- Current support model: \ +BST, DBMTL, DeepFM, DIEN, DIN, DLRM, DSSM, ESMM, MMoE, SimpleMultiTask, WDL + +- Usage: \ + For every model listed above, there is a prepare_savedmodel.py. To run this script please firstly ensure you have gotten the checkpoint file from training. To use prepare_savedmodel.py, please use: + + ``` + cd [modelfolder] + python prepare_savedmodel.py --checkpoint [ckpt path] + ``` + - If you choose --bf16 during training stage, please also add it above. For example: + + ``` + cd [modelfolder] + python prepare_savedmodel.py --bf16 --checkpoint [ckpt path] + ``` + + + - Example: \ + This is an example for BST model without bf16 feature + ``` + cd modelzoo/BST + python prepare_savedmodel.py --checkpoint ./result/model_BST_1657777492 + ``` + + - Output: \ + The savedmodel will be stored under ./savedmodels folder + + +## How to use start_serving.cc to retrieving serving result + +- Functionality: \ +start_serving.cc provides functionality such that you can get serving result after getting the savedmodel. + +- Parameter: + 1. At the start of main(), there is a file_path variable. Please substitude it to your own evaluation file path (the format should be the same as training one) + 2. Please edit the savedmodel and checkpoint path in the model_config which is at the very begining. + +- Usage: + 1. Please edit BUILD file to add the start_serving + 2. Please make sure you have installed serving part properly + 3. Then go back to the Deeprec folder and do as the follows, assuming the cc_binary name in BUILD is "wdl_demo": + ``` + bazel build //serving/processor/tests:wdl_demo + bazel-bin/serving/processor/tests/wdl_demo + ``` + + +## For DIEN model +- Since the input of DIEN is more complicated than other models, we provide a generate_data.py to preprocess the data. + +- Please prepare data in the same format as training (the VOC and split files), then modify "data_location" in generate_data.py. + +- Generate_data.py will produce a test.csv for serving file to read. + +- Other prodedure is just similar to the above. + + + + diff --git a/modelzoo/features/serving/WDL/prepare_savedmodel.py b/modelzoo/features/serving/WDL/prepare_savedmodel.py new file mode 100644 index 00000000000..ce541fb782b --- /dev/null +++ b/modelzoo/features/serving/WDL/prepare_savedmodel.py @@ -0,0 +1,393 @@ +import time +import argparse +import tensorflow as tf +import os +import sys +import math +import collections +from tensorflow.python.client import timeline +import json +from tensorflow.python.training import incremental_saver as tf_incr_saver +from tensorflow.core.framework.embedding import config_pb2 +from tensorflow.python.ops import variables + + +CONTINUOUS_COLUMNS = ["I" + str(i) for i in range(1, 14)] # 1-13 inclusive +CATEGORICAL_COLUMNS = ["C" + str(i) for i in range(1, 27)] # 1-26 inclusive +LABEL_COLUMN = ["clicked"] +TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS +FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS +IDENTIY_COLUMNS = ["I10"] +HASH_BUCKET_SIZES = { + 'C1': 2500, + 'C2': 2000, + 'C3': 300000, + 'C4': 250000, + 'C5': 1000, + 'C6': 100, + 'C7': 20000, + 'C8': 4000, + 'C9': 20, + 'C10': 100000, + 'C11': 10000, + 'C12': 250000, + 'C13': 40000, + 'C14': 100, + 'C15': 100, + 'C16': 200000, + 'C17': 50, + 'C18': 10000, + 'C19': 4000, + 'C20': 20, + 'C21': 250000, + 'C22': 100, + 'C23': 100, + 'C24': 250000, + 'C25': 400, + 'C26': 100000 +} + +IDENTITY_NUM_BUCKETS = {'I10': 10} + +EMBEDDING_DIMENSIONS = { + 'C1': 64, + 'C2': 64, + 'C3': 128, + 'C4': 128, + 'C5': 64, + 'C6': 64, + 'C7': 64, + 'C8': 64, + 'C9': 64, + 'C10': 128, + 'C11': 64, + 'C12': 128, + 'C13': 64, + 'C14': 64, + 'C15': 64, + 'C16': 128, + 'C17': 64, + 'C18': 64, + 'C19': 64, + 'C20': 64, + 'C21': 128, + 'C22': 64, + 'C23': 64, + 'C24': 128, + 'C25': 64, + 'C26': 128 +} + + +def add_layer_summary(value, tag): + tf.summary.scalar('%s/fraction_of_zero_values' % tag, + tf.nn.zero_fraction(value)) + tf.summary.histogram('%s/activation' % tag, value) + + +def build_feature_cols(train_file_path, test_file_path): + # Statistics of Kaggle's Criteo Dataset has been calculated in advance to save time + print('****Computing statistics of train dataset*****') + # with open(train_file_path, 'r') as f, open(test_file_path, 'r') as f1: + # nums = [line.strip('\n').split(',') for line in f.readlines() + # ] + [line.strip('\n').split(',') for line in f1.readlines()] + # numpy_arr = np.array(nums) + # mins_list, max_list, range_list = [], [], [] + # for i in range(len(TRAIN_DATA_COLUMNS)): + # if TRAIN_DATA_COLUMNS[i] in CONTINUOUS_COLUMNS: + # col_min = numpy_arr[:, i].astype(np.float32).min() + # col_max = numpy_arr[:, i].astype(np.float32).max() + # mins_list.append(col_min) + # max_list.append(col_max) + # range_list.append(col_max - col_min) + mins_list = [ + 0.0, -3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 + ] + range_list = [ + 1539.0, 22069.0, 65535.0, 561.0, 2655388.0, 233523.0, 26297.0, 5106.0, + 24376.0, 9.0, 181.0, 1807.0, 6879.0 + ] + + def make_minmaxscaler(min, range): + def minmaxscaler(col): + return (col - min) / range + + return minmaxscaler + + deep_columns = [] + wide_columns = [] + for column_name in FEATURE_COLUMNS: + if column_name in IDENTITY_NUM_BUCKETS: + categorical_column = tf.feature_column.categorical_column_with_identity( + column_name, num_buckets=IDENTITY_NUM_BUCKETS[column_name]) + wide_columns.append(categorical_column) + deep_columns.append( + tf.feature_column.indicator_column(categorical_column)) + elif column_name in CATEGORICAL_COLUMNS: + ev_option = None + if(args.ev_storage == "pmem_libpmem"): + ev_option = tf.EmbeddingVariableOption(storage_option=variables.StorageOption( + storage_type=config_pb2.StorageType.PMEM_LIBPMEM, + storage_path=args.ev_storage_path, + storage_size=[args.ev_storage_size_gb*1024*1024*1024])) + elif(args.ev_storage == "pmem_memkind"): + ev_option = tf.EmbeddingVariableOption(storage_option=variables.StorageOption( + storage_type=config_pb2.StorageType.PMEM_MEMKIND)) + elif(args.ev_storage == "dram_pmem"): + ev_option = tf.EmbeddingVariableOption(storage_option=variables.StorageOption( + storage_type=config_pb2.StorageType.DRAM_PMEM, + storage_path=args.ev_storage_path, + storage_size=[args.ev_storage_size_gb*1024*1024*1024, args.ev_storage_size_gb*1024*1024*1024])) + else: + ev_option = tf.EmbeddingVariableOption(storage_option=variables.StorageOption( + storage_type=config_pb2.StorageType.DRAM)) + categorical_column = tf.feature_column.categorical_column_with_embedding( + column_name, + dtype=tf.string, + ev_option=ev_option) + wide_columns.append(categorical_column) + + deep_columns.append( + tf.feature_column.embedding_column( + categorical_column, + dimension=EMBEDDING_DIMENSIONS[column_name], + combiner='mean')) + else: + normalizer_fn = None + i = CONTINUOUS_COLUMNS.index(column_name) + col_min = mins_list[i] + col_range = range_list[i] + normalizer_fn = make_minmaxscaler(col_min, col_range) + column = tf.feature_column.numeric_column( + column_name, normalizer_fn=normalizer_fn, shape=(1, )) + wide_columns.append(column) + deep_columns.append(column) + + return wide_columns, deep_columns + + +class WDL(): + def __init__(self, + wide_column=None, + deep_column=None, + dnn_hidden_units=[1024, 512, 256], + linear_learning_rate=0.2, + deep_learning_rate=0.01, + inputs=None, + bf16=False, + input_layer_partitioner=None, + dense_layer_partitioner=None, + saved_model=False): + + if not inputs: + raise ValueError('Dataset is not defined.') + self.wide_column = wide_column + self.deep_column = deep_column + if not wide_column or not deep_column: + raise ValueError('Wide column or Deep column is not defined.') + self.dnn_hidden_units = dnn_hidden_units + self.linear_learning_rate = linear_learning_rate + self.deep_learning_rate = deep_learning_rate + self.input_layer_partitioner = input_layer_partitioner + self.dense_layer_partitioner = dense_layer_partitioner + self.global_step = tf.train.get_or_create_global_step() + + self.feature = inputs + self.bf16 = bf16 + + self._is_training = False + + self.predict = self.prediction() + + def dnn(self, dnn_input, dnn_hidden_units=None, layer_name=''): + for layer_id, num_hidden_units in enumerate(dnn_hidden_units): + with tf.variable_scope(layer_name + "_%d" % layer_id, + partitioner=self.dense_layer_partitioner, + reuse=tf.AUTO_REUSE) as dnn_layer_scope: + dnn_input = tf.layers.dense( + dnn_input, + units=num_hidden_units, + activation=tf.nn.relu, + kernel_initializer=tf.glorot_uniform_initializer(), + name=dnn_layer_scope) + + add_layer_summary(dnn_input, dnn_layer_scope.name) + + return dnn_input + + def prediction(self): + # input features + self.dnn_parent_scope = 'dnn' + with tf.variable_scope(self.dnn_parent_scope): + with tf.variable_scope("input_from_feature_columns", + partitioner=self.input_layer_partitioner, + reuse=tf.AUTO_REUSE) as dnn_inputs_scope: + net = tf.feature_column.input_layer( + features=self.feature, feature_columns=self.deep_column) + + add_layer_summary(net, dnn_inputs_scope.name) + + if self.bf16: + net = tf.cast(net, dtype=tf.bfloat16) + with tf.variable_scope( + 'dnn_layers', + partitioner=self.dense_layer_partitioner, + reuse=tf.AUTO_REUSE).keep_weights(): + net = self.dnn(net, self.dnn_hidden_units, "hiddenlayer") + + with tf.variable_scope( + "logits", + values=(net, )).keep_weights() as dnn_logits_scope: + dnn_logits = tf.layers.dense(net, + units=1, + activation=None, + name=dnn_logits_scope) + add_layer_summary(dnn_logits, dnn_logits_scope.name) + dnn_logits = tf.cast(dnn_logits, dtype=tf.float32) + + else: + with tf.variable_scope( + 'dnn_layers', + partitioner=self.dense_layer_partitioner, + reuse=tf.AUTO_REUSE): + net = self.dnn(net, self.dnn_hidden_units, "hiddenlayer") + + with tf.variable_scope("logits", + values=(net, )) as dnn_logits_scope: + dnn_logits = tf.layers.dense(net, + units=1, + activation=None, + name=dnn_logits_scope) + add_layer_summary(dnn_logits, dnn_logits_scope.name) + + self.linear_parent_scope = 'linear' + with tf.variable_scope( + self.linear_parent_scope, + partitioner=self.input_layer_partitioner) as scope: + linear_logits = tf.feature_column.linear_model( + units=1, + features=self.feature, + feature_columns=self.wide_column, + sparse_combiner='sum', + weight_collections=None, + trainable=True) + + add_layer_summary(linear_logits, scope.name) + + self.logits = tf.add_n([dnn_logits, linear_logits]) + predict = tf.math.sigmoid(self.logits) + + return predict + + + +def get_arg_parser(): + parser = argparse.ArgumentParser() + + parser.add_argument('--batch_size', + help='Batch size to train. Default is 512', + type=int, + default=512) + + parser.add_argument('--bf16', + help='enable DeepRec BF16 in deep model. Default FP32', + action='store_true') + + parser.add_argument('--checkpoint', + type=str, + help='ckpt path', + default="") + parser.add_argument('--data_location', + help='Full path of train data', + required=False, + default='../data') + parser.add_argument('--ev_storage', + type=str, + choices=['dram', 'pmem_libpmem', 'pmem_memkind', 'dram_pmem'], + default='dram') + parser.add_argument('--linear_learning_rate', + help='Learning rate for linear model', + type=float, + default=0.2) + parser.add_argument('--deep_learning_rate', + help='Learning rate for deep model', + type=float, + default=0.01) + return parser + + +def main(tf_config=None, server=None): + + # set batch size & steps + batch_size = args.batch_size + train_file = args.data_location + '/train.csv' + test_file = args.data_location + '/eval.csv' + + # set fixed random seed + tf.set_random_seed(2021) + + # create data pipline + wide_column, deep_column = build_feature_cols(train_file, test_file) + + + is_saved_model = True + inputs = {} + # I1-I13 + for x in range(1, 10): + inputs['I'+str(x)] = tf.placeholder(tf.float32, + [None], name='I'+str(x)) + inputs['I10'] = tf.placeholder(tf.int64, [None], name='I10') + for x in range(11, 14): + inputs['I'+str(x)] = tf.placeholder(tf.float32, + [None], name='I'+str(x)) + # C1-C26 + for x in range(1, 27): + inputs['C'+str(x)] = tf.placeholder(tf.string, + [None], name='C'+str(x)) + real_input = inputs + + # create model + model = WDL(wide_column=wide_column, + deep_column=deep_column, + linear_learning_rate=args.linear_learning_rate, + deep_learning_rate=args.deep_learning_rate, + bf16=args.bf16, + inputs=real_input, + input_layer_partitioner=None, + dense_layer_partitioner=None, + saved_model=True) + + with tf.Session() as sess1: + + # Initialize saver + folder_dir = args.checkpoint + saver = tf.train.Saver() + + sess1.run(tf.global_variables_initializer()) + sess1.run(tf.local_variables_initializer()) + # Restore from checkpoint + saver.restore(sess1,tf.train.latest_checkpoint(folder_dir)) + + # Get save directory + dir = "./savedmodels" + os.makedirs(dir,exist_ok=True) + cc_time = int(time.time()) + saved_path = os.path.join(dir,str(cc_time)) + os.mkdir(saved_path) + + + tf.saved_model.simple_save( + sess1, + saved_path, + inputs = model.feature, + outputs = {"Sigmoid":model.predict} + ) + + +if __name__ == "__main__": + parser = get_arg_parser() + args = parser.parse_args() + + + main() + \ No newline at end of file diff --git a/modelzoo/features/serving/WDL/start_serving_wdl.cc b/modelzoo/features/serving/WDL/start_serving_wdl.cc new file mode 100644 index 00000000000..0467345927c --- /dev/null +++ b/modelzoo/features/serving/WDL/start_serving_wdl.cc @@ -0,0 +1,251 @@ +#include +#include +#include "serving/processor/serving/processor.h" +#include "serving/processor/serving/predict.pb.h" + +static const char* model_config = "{ \ + \"omp_num_threads\": 4, \ + \"kmp_blocktime\": 0, \ + \"feature_store_type\": \"memory\", \ + \"serialize_protocol\": \"protobuf\", \ + \"inter_op_parallelism_threads\": 10, \ + \"intra_op_parallelism_threads\": 10, \ + \"init_timeout_minutes\": 1, \ + \"signature_name\": \"serving_default\", \ + \"read_thread_num\": 3, \ + \"update_thread_num\": 2, \ + \"model_store_type\": \"local\", \ + \"checkpoint_dir\": \"/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/WDL/result/\", \ + \"savedmodel_dir\": \"/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/WDL/savedmodels/1658806107/\" \ + } "; + +struct input_format39array{ + float I1_9[9]; + int I10; + float I11_13[3]; + char* C1_26[26]; +}; + +::tensorflow::eas::ArrayProto get_proto_cc(void* char_input, int dim,::tensorflow::eas::ArrayDataType type){ + ::tensorflow::eas::ArrayShape array_shape; + array_shape.add_dim(1); + ::tensorflow::eas::ArrayProto input; + input.set_dtype(type); + + + switch(input.dtype()){ + case 1: + input.add_float_val(*((float*)char_input)); + break; + case 7: + input.add_string_val((char*)char_input); + break; + case 9: + input.add_int64_val(*((int*)char_input)); + break; + } + + *(input.mutable_array_shape()) = array_shape; + + return input; +} + + + +int main(int argc, char** argv) { + + // PLEASE EDIT THIS LINE!!!! + char filepath[] = "/home/deeprec/DeepRec/modelzoo/features/EmbeddingVariable/WDL/test.csv"; + + // // ------------------------------------------initialize serving model----------------------------------------- + int state; + void* model = initialize("", model_config, &state); + if (state == -1) { + std::cerr << "initialize error\n"; + } + + // // ---------------------------------------prepare serving data from file-------------------------------------- + + FILE *fp = nullptr; + char *line, *record; + char buffer2[1024]; + char delim[] = ","; + char end[] = "\n"; + int j = 0; + int rows = 0; + + + // get row number + if ( (fp = fopen(filepath,"at+")) != nullptr) { + while ((line = fgets(buffer2, sizeof(buffer2), fp)) != nullptr) rows++; + } + + fclose(fp); + + + // get rows + char* all_elems[rows*39]; + int cur_pos = 0; + + if ( (fp = fopen(filepath,"at+")) != nullptr) { + while ((line = fgets(buffer2, sizeof(buffer2), fp)) != NULL) { + record = strtok(line, delim); + while (record != NULL) { + // only 1 label and 39 feature + if (j >= 40) break; + // disragard label + if (j == 0) {j++; record = strtok(NULL,delim); continue;} + char* cur_item = (char*) malloc(sizeof(char)*strlen(record)); + strcpy(cur_item,record); + if (cur_item[strlen(cur_item)-1] == *end) cur_item[strlen(cur_item)-1] = '\0'; + all_elems[cur_pos] = cur_item; + + cur_pos++; + record = strtok(NULL, delim); + j++; + + } + j = 0; + + } + } + + fclose(fp); + + + + // ----------------------------------------------prepare request input---------------------------------------------------- + for(int ii = 0; ii < rows; ii++ ){ + int start_idx = ii * 39; + + struct input_format39array inputs; + for(int jj = 0; jj < 39; jj++){ + if( jj >= 0 && jj <= 8 ) inputs.I1_9[jj] = (float)(atof(all_elems[start_idx + jj])); + else if( jj == 9) inputs.I10 = (int)atoi(all_elems[start_idx+9]); + else if( jj >= 10 && jj <= 12) inputs.I11_13[jj-10] = (float)(atof(all_elems[start_idx + jj])); + else inputs.C1_26[jj-13] = (char*)all_elems[start_idx+jj]; + } + + + // get input type + ::tensorflow::eas::ArrayDataType dtype_f = + ::tensorflow::eas::ArrayDataType::DT_FLOAT; + + ::tensorflow::eas::ArrayDataType dtype_s = + ::tensorflow::eas::ArrayDataType::DT_STRING; + + ::tensorflow::eas::ArrayDataType dtype_i = + ::tensorflow::eas::ArrayDataType::DT_INT64; + + // input setting + ::tensorflow::eas::ArrayProto I1 = get_proto_cc(&inputs.I1_9[0],1,dtype_f); + ::tensorflow::eas::ArrayProto I2 = get_proto_cc(&inputs.I1_9[1],1,dtype_f); + ::tensorflow::eas::ArrayProto I3 = get_proto_cc(&inputs.I1_9[2],1,dtype_f); + ::tensorflow::eas::ArrayProto I4 = get_proto_cc(&inputs.I1_9[3],1,dtype_f); + ::tensorflow::eas::ArrayProto I5 = get_proto_cc(&inputs.I1_9[4],1,dtype_f); + ::tensorflow::eas::ArrayProto I6 = get_proto_cc(&inputs.I1_9[5],1,dtype_f); + ::tensorflow::eas::ArrayProto I7 = get_proto_cc(&inputs.I1_9[6],1,dtype_f); + ::tensorflow::eas::ArrayProto I8 = get_proto_cc(&inputs.I1_9[7],1,dtype_f); + ::tensorflow::eas::ArrayProto I9 = get_proto_cc(&inputs.I1_9[8],1,dtype_f); + ::tensorflow::eas::ArrayProto I10 = get_proto_cc(&inputs.I10,1,dtype_i); + ::tensorflow::eas::ArrayProto I11 = get_proto_cc(&inputs.I11_13[0],1,dtype_f); + ::tensorflow::eas::ArrayProto I12 = get_proto_cc(&inputs.I11_13[1],1,dtype_f); + ::tensorflow::eas::ArrayProto I13 = get_proto_cc(&inputs.I11_13[2],1,dtype_f); + ::tensorflow::eas::ArrayProto C1 = get_proto_cc(inputs.C1_26[0],strlen(inputs.C1_26[0]),dtype_s); + ::tensorflow::eas::ArrayProto C2 = get_proto_cc(inputs.C1_26[1],strlen(inputs.C1_26[1]),dtype_s); + ::tensorflow::eas::ArrayProto C3 = get_proto_cc(inputs.C1_26[2],strlen(inputs.C1_26[2]),dtype_s); + ::tensorflow::eas::ArrayProto C4 = get_proto_cc(inputs.C1_26[3],strlen(inputs.C1_26[3]),dtype_s); + ::tensorflow::eas::ArrayProto C5 = get_proto_cc(inputs.C1_26[4],strlen(inputs.C1_26[4]),dtype_s); + ::tensorflow::eas::ArrayProto C6 = get_proto_cc(inputs.C1_26[5],strlen(inputs.C1_26[5]),dtype_s); + ::tensorflow::eas::ArrayProto C7 = get_proto_cc(inputs.C1_26[6],strlen(inputs.C1_26[6]),dtype_s); + ::tensorflow::eas::ArrayProto C8 = get_proto_cc(inputs.C1_26[7],strlen(inputs.C1_26[7]),dtype_s); + ::tensorflow::eas::ArrayProto C9 = get_proto_cc(inputs.C1_26[8],strlen(inputs.C1_26[8]),dtype_s); + ::tensorflow::eas::ArrayProto C10 = get_proto_cc(inputs.C1_26[9],strlen(inputs.C1_26[9]),dtype_s); + ::tensorflow::eas::ArrayProto C11 = get_proto_cc(inputs.C1_26[10],strlen(inputs.C1_26[10]),dtype_s); + ::tensorflow::eas::ArrayProto C12 = get_proto_cc(inputs.C1_26[11],strlen(inputs.C1_26[11]),dtype_s); + ::tensorflow::eas::ArrayProto C13 = get_proto_cc(inputs.C1_26[12],strlen(inputs.C1_26[12]),dtype_s); + ::tensorflow::eas::ArrayProto C14 = get_proto_cc(inputs.C1_26[13],strlen(inputs.C1_26[13]),dtype_s); + ::tensorflow::eas::ArrayProto C15 = get_proto_cc(inputs.C1_26[14],strlen(inputs.C1_26[14]),dtype_s); + ::tensorflow::eas::ArrayProto C16 = get_proto_cc(inputs.C1_26[15],strlen(inputs.C1_26[15]),dtype_s); + ::tensorflow::eas::ArrayProto C17 = get_proto_cc(inputs.C1_26[16],strlen(inputs.C1_26[16]),dtype_s); + ::tensorflow::eas::ArrayProto C18 = get_proto_cc(inputs.C1_26[17],strlen(inputs.C1_26[17]),dtype_s); + ::tensorflow::eas::ArrayProto C19 = get_proto_cc(inputs.C1_26[18],strlen(inputs.C1_26[18]),dtype_s); + ::tensorflow::eas::ArrayProto C20 = get_proto_cc(inputs.C1_26[19],strlen(inputs.C1_26[19]),dtype_s); + ::tensorflow::eas::ArrayProto C21 = get_proto_cc(inputs.C1_26[20],strlen(inputs.C1_26[20]),dtype_s); + ::tensorflow::eas::ArrayProto C22 = get_proto_cc(inputs.C1_26[21],strlen(inputs.C1_26[21]),dtype_s); + ::tensorflow::eas::ArrayProto C23 = get_proto_cc(inputs.C1_26[22],strlen(inputs.C1_26[22]),dtype_s); + ::tensorflow::eas::ArrayProto C24 = get_proto_cc(inputs.C1_26[23],strlen(inputs.C1_26[23]),dtype_s); + ::tensorflow::eas::ArrayProto C25 = get_proto_cc(inputs.C1_26[24],strlen(inputs.C1_26[24]),dtype_s); + ::tensorflow::eas::ArrayProto C26 = get_proto_cc(inputs.C1_26[25],strlen(inputs.C1_26[25]),dtype_s); + + + // PredictRequest + ::tensorflow::eas::PredictRequest req; + req.set_signature_name("serving_default"); + req.add_output_filter("Sigmoid:0"); + + (*req.mutable_inputs())["I1:0"] = I1; + (*req.mutable_inputs())["I2:0"] = I2; + (*req.mutable_inputs())["I3:0"] = I3; + (*req.mutable_inputs())["I4:0"] = I4; + (*req.mutable_inputs())["I5:0"] = I5; + (*req.mutable_inputs())["I6:0"] = I6; + (*req.mutable_inputs())["I7:0"] = I7; + (*req.mutable_inputs())["I8:0"] = I8; + (*req.mutable_inputs())["I9:0"] = I9; + (*req.mutable_inputs())["I10:0"] = I10; + (*req.mutable_inputs())["I11:0"] = I11; + (*req.mutable_inputs())["I12:0"] = I12; + (*req.mutable_inputs())["I13:0"] = I13; + (*req.mutable_inputs())["C1:0"] = C1; + (*req.mutable_inputs())["C2:0"] = C2; + (*req.mutable_inputs())["C3:0"] = C3; + (*req.mutable_inputs())["C4:0"] = C4; + (*req.mutable_inputs())["C5:0"] = C5; + (*req.mutable_inputs())["C6:0"] = C6; + (*req.mutable_inputs())["C7:0"] = C7; + (*req.mutable_inputs())["C8:0"] = C8; + (*req.mutable_inputs())["C9:0"] = C9; + (*req.mutable_inputs())["C10:0"] = C10; + (*req.mutable_inputs())["C11:0"] = C11; + (*req.mutable_inputs())["C12:0"] = C12; + (*req.mutable_inputs())["C13:0"] = C13; + (*req.mutable_inputs())["C14:0"] = C14; + (*req.mutable_inputs())["C15:0"] = C15; + (*req.mutable_inputs())["C16:0"] = C16; + (*req.mutable_inputs())["C17:0"] = C17; + (*req.mutable_inputs())["C18:0"] = C18; + (*req.mutable_inputs())["C19:0"] = C19; + (*req.mutable_inputs())["C20:0"] = C20; + (*req.mutable_inputs())["C21:0"] = C21; + (*req.mutable_inputs())["C22:0"] = C22; + (*req.mutable_inputs())["C23:0"] = C23; + (*req.mutable_inputs())["C24:0"] = C24; + (*req.mutable_inputs())["C25:0"] = C25; + (*req.mutable_inputs())["C26:0"] = C26; + + + size_t size = req.ByteSizeLong(); + void *buffer1 = malloc(size); + req.SerializeToArray(buffer1, size); + + // ----------------------------------------------process and get feedback--------------------------------------------------- + void* output = nullptr; + int output_size = 0; + state = process(model, buffer1, size, &output, &output_size); + + // parse response + std::string output_string((char*)output, output_size); + ::tensorflow::eas::PredictResponse resp; + resp.ParseFromString(output_string); + std::cout << "process returned state: " << state << ", response: " << resp.DebugString(); + } + + //free memory + for(int i=0; i < rows;i++){ + free(all_elems[i]); + } + + return 0; +} +