Solutions to get the code running #8

GeYuYao-hub opened this issue Mar 18, 2023 · 2 comments

My solution for ijcnn19attacks

If I solved your problem, please reply to this issue to let me know.

clone from github

Creating a conda environment

  • python = 3.9.15


  • keras = 2.10.0
  • numpy = 1.24.2
  • pandas = 1.3.5
  • scikit-learn = 1.0.2
  • scipy = 1.7.3
  • matplotlib = 3.6.2
  • tensorflow-gpu = 2.10.1
  • h5py = 3.7.0



Modify the code for the main. py header

import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import MDS
import keras
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
import sys
import time
import os
import math
import tensorflow.compat.v1 as tf
import cleverhans_tutorials.tsc_tutorial_keras_tf as attack


Add the following code

import tensorflow.compat.v1 as tf 


Add the following code

import tensorflow.compat.v1 as tf 


Delete the original content in line 215 and replace it with the following

import tensorflow.compat.v1 as tf 


Add the following code

import tensorflow.compat.v1 as tf 

Modify def fprop to the following

 def fprop(self, x):
        Exposes all the layers of the model returned by get_layer_names.
        :param x: A symbolic representation of the network input
        :return: A dictionary mapping layer names to the symbolic
                 representation of their output.
        global fprop_dict
        from keras.models import Model as KerasModel

        if self.keras_model is None:
            # Get the input layer
            # self.model(tf.keras.Input(shape = (270,1)))
            # new_input = self.model.get_input_at(0)
            new_input = self.model.inputs

            # Make a new model that returns each of the layers as output
            out_layers = [x_layer.output for x_layer in self.model.layers]
            # print(out_layers)
            self.keras_model = KerasModel(new_input, out_layers)

            # and get the outputs for that model on the input x
            outputs = self.keras_model(x)

            # Keras only returns a list for outputs of length >= 1, if the model
            # is only one layer, wrap a list
            if len(self.model.layers) == 1:
                outputs = [outputs]

            # compute the dict to return
            fprop_dict = dict(zip(self.get_layer_names(), outputs))

        return fprop_dict


Cover with the following

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import sklearn
import tensorflow as tf
from tensorflow.python.platform import flags
import numpy as np
import keras
from keras import backend
import pandas as pd

from cleverhans_copy.attacks import FastGradientMethod
from cleverhans_copy.attacks import BasicIterativeMethod
from cleverhans_copy.utils import AccuracyReport
from cleverhans_copy.utils_keras import KerasModelWrapper
from cleverhans_copy.utils_tf import model_eval

from sklearn.preprocessing import LabelEncoder

import tensorflow.compat.v1 as tf  # 使用1.0版本的方法

tf.disable_v2_behavior()  # 禁用2.0版本的方法



def readucr(filename):
    data = np.loadtxt(filename, delimiter=',')
    Y = data[:, 0]
    X = data[:, 1:]
    return X, Y

def read_dataset(root_dir, archive_name, dataset_name):
    datasets_dict = {}

    file_name = root_dir + '/archives/' + archive_name + '/' + dataset_name + '/' + dataset_name
    x_train, y_train = readucr(file_name + '_TRAIN')
    x_test, y_test = readucr(file_name + '_TEST')
    datasets_dict[dataset_name] = (x_train.copy(), y_train.copy(), x_test.copy(),

    return datasets_dict

def transform_labels(y_train,y_test):
    Transform label to min equal zero and continuous
    For example if we have [1,3,4] --->  [0,1,2]
    # init the encoder
    encoder = LabelEncoder()
    # concat train and test to fit
    y_train_test = np.concatenate((y_train,y_test),axis =0)
    # fit the encoder
    # transform to min zero and continuous labels
    new_y_train_test = encoder.transform(y_train_test)
    # resplit the train and test
    new_y_train = new_y_train_test[0:len(y_train)]
    new_y_test = new_y_train_test[len(y_train):]
    return new_y_train, new_y_test

def prepare_data(datasets_dict,dataset_name):
    x_train = datasets_dict[dataset_name][0]
    y_train = datasets_dict[dataset_name][1]
    x_test = datasets_dict[dataset_name][2]
    y_test = datasets_dict[dataset_name][3]

    nb_classes = len(np.unique(np.concatenate((y_train, y_test), axis=0)))

    # make the min to zero of labels
    y_train, y_test = transform_labels(y_train, y_test)

    # save orignal y because later we will use binary
    y_true = y_test.astype(np.int64)
    # transform the labels from integers to one hot vectors
    enc = sklearn.preprocessing.OneHotEncoder(), y_test), axis=0).reshape(-1, 1))
    y_train = enc.transform(y_train.reshape(-1, 1)).toarray()
    y_test = enc.transform(y_test.reshape(-1, 1)).toarray()

    if len(x_train.shape) == 2:  # if univariate
        # add a dimension to make it multivariate with one dimension
        x_train = x_train.reshape((x_train.shape[0], x_train.shape[1], 1))
        x_test = x_test.reshape((x_test.shape[0], x_test.shape[1], 1))

    return x_train, y_train, x_test, y_test,y_true, nb_classes

def create_directory(directory_path):
    if os.path.exists(directory_path):
        return None
    return directory_path

def add_labels_to_adv_test_set(dataset_dict,dataset_name, adv_data_dir,original_y):
    x_test_perturbed = np.loadtxt(adv_data_dir+dataset_name+'-adv', delimiter=',')
    test_set = np.zeros((original_y.shape[0],x_test_perturbed.shape[1]+1),dtype=np.float64)
    test_set[:,0] = original_y
    test_set[:,1:] = x_test_perturbed

def tsc_tutorial(attack_method='fgsm',batch_size=BATCH_SIZE,

    # keras.layers.core.K.set_learning_phase(0)

    # Object used to keep track of (and return) key accuracies
    report = AccuracyReport()

    # Set TF random seed to improve reproducibility

    if not hasattr(backend, "tf"):
        raise RuntimeError("This tutorial requires keras to be configured"
                           " to use the TensorFlow backend.")

    # if keras.backend.image_dim_ordering() != 'tf':
    #     keras.backend.set_image_dim_ordering('tf')
    #     print("INFO: '~/.keras/keras.json' sets 'image_dim_ordering' to "
    #           "'th', temporarily setting to 'tf'")

    # Create TF session and set as Keras backend session
    sess = tf.Session()

    # dataset_name = 'Adiac'
    classifier_name = 'resnet'
    root_dir = 'D:/Project/ijcnn19attacks/'

    archive_name = 'TSC'
    out_dir = 'ucr-attack/'
    file_path = root_dir + 'results/' + classifier_name + '/' + archive_name + \
                '/' + dataset_name + '/best_model.hdf5'

    adv_data_dir = out_dir+attack_method+'/'+archive_name+'/'+attack_on+\

    if os.path.exists(adv_data_dir+dataset_name+'-adv'):

    dataset_dict = read_dataset(root_dir, archive_name, dataset_name)

    x_train, y_train, x_test, y_test, _, nb_classes = prepare_data(dataset_dict,dataset_name)

    if attack_on == 'train':
        X = x_train
        Y = y_train
        original_y = dataset_dict[dataset_name][1]
    elif attack_on =='test':
        X = x_test
        Y = y_test
        original_y = dataset_dict[dataset_name][3]
        print('Error either train or test options for attack_on param')

    # for big datasets we should decompose in batches the evaluation of the attack
    # loop through the batches
    ori_acc = 0
    adv_acc = 0

    res_dir = out_dir + 'results'+attack_method+'.csv'
    if os.path.exists(res_dir):
        res_ori = pd.read_csv(res_dir, index_col=False)
        res_ori = pd.DataFrame(data=np.zeros((0, 3), dtype=float), index=[],
                               columns=['dataset_name', 'ori_acc', 'adv_acc'])

    test_set = np.zeros((Y.shape[0], x_train.shape[1] + 1), dtype=np.float64)

    for i in range(0,len(X),batch_size):
        curr_X = X[i:i+batch_size]
        curr_Y = Y[i:i+batch_size]

        # Obtain series Parameters
        img_rows, nchannels = x_train.shape[1:3]

        # Define input TF placeholder
        x = tf.placeholder(tf.float32, shape=(None, img_rows, nchannels))
        y = tf.placeholder(tf.float32, shape=(None, nb_classes))

        # Define TF model graph
        model = keras.models.load_model(file_path)
        preds = model(x)
        print("Defined TensorFlow model graph.")

        def evaluate():
            # Evaluate the accuracy of the model on legitimate test examples
            eval_params = {'batch_size': batch_size}
            acc = model_eval(sess, x, y, preds, curr_X, curr_Y, args=eval_params)
            report.clean_train_clean_eval = acc
            print('Test accuracy on legitimate examples: %0.4f' % acc)
            return acc

        wrap = KerasModelWrapper(model)

        ori_acc += evaluate() * len(curr_X)/len(X)

        if attack_method == 'fgsm':
            # Initialize the Fast Gradient Sign Method (FGSM) attack object and graph
            fgsm = FastGradientMethod(wrap, sess=sess)
            fgsm_params = {'eps': eps }
            adv_x = fgsm.generate(x, **fgsm_params)
        elif attack_method == 'bim':
            # BasicIterativeMethod
            bim = BasicIterativeMethod(wrap,sess=sess)
            bim_params = {'eps':eps, 'eps_iter':0.05, 'nb_iter':10}
            adv_x = bim.generate(x,**bim_params)
            print('Either bim or fgsm are acceptable as attack methods')

        # Consider the attack to be constant
        adv_x = tf.stop_gradient(adv_x)

        adv = adv_x.eval({x: curr_X}, session=sess)
        adv = adv.reshape(adv.shape[0],adv.shape[1])

        preds_adv = model(adv_x)

        # Evaluate the accuracy of the model on adversarial examples
        eval_par = {'batch_size': batch_size}
        acc = model_eval(sess, x, y, preds_adv, curr_X, curr_Y, args=eval_par)
        print('Test accuracy on adversarial examples: %0.4f\n' % acc)
        report.clean_train_adv_eval = acc
        adv_acc += acc * len(curr_X)/len(X)

        test_set[i:i+batch_size,0] = original_y[i:i+batch_size]
        test_set[i:i+batch_size,1:] = adv


    np.savetxt(adv_data_dir+dataset_name+'-adv',test_set, delimiter=',')

    add_labels_to_adv_test_set(dataset_dict, dataset_name, adv_data_dir,original_y)

    res = pd.DataFrame(data = np.zeros((1,3),dtype=float), index=[0],
    res['dataset_name'] = dataset_name+str(eps)
    res['ori_acc'] = ori_acc
    res['adv_acc'] = adv_acc
    res_ori = pd.concat((res_ori,res),sort=False)

    return report

def main(argv=None,attack_method='fgsm'):
    flags.DEFINE_integer('batch_size', BATCH_SIZE, 'Size of training batches')

    # full datasets
    dataset_names = ['50words', 'Adiac', 'ArrowHead', 'Beef', 'BeetleFly', 'BirdChicken', 'Car', 'CBF',
                     'ChlorineConcentration', 'CinC_ECG_torso', 'Coffee',
                     'Computers', 'Cricket_X', 'Cricket_Y', 'Cricket_Z', 'DiatomSizeReduction',
                     'DistalPhalanxOutlineAgeGroup', 'DistalPhalanxOutlineCorrect', 'DistalPhalanxTW',
                     'Earthquakes', 'ECG200', 'ECG5000', 'ECGFiveDays', 'ElectricDevices', 'FaceAll', 'FaceFour',
                     'FacesUCR', 'FISH', 'FordA', 'FordB', 'Gun_Point', 'Ham', 'HandOutlines',
                     'Haptics', 'Herring', 'InlineSkate', 'InsectWingbeatSound', 'ItalyPowerDemand',
                     'LargeKitchenAppliances', 'Lighting2', 'Lighting7', 'MALLAT', 'Meat', 'MedicalImages',
                     'MiddlePhalanxOutlineAgeGroup', 'MiddlePhalanxOutlineCorrect', 'MiddlePhalanxTW', 'MoteStrain',
                     'NonInvasiveFatalECG_Thorax1', 'NonInvasiveFatalECG_Thorax2', 'OliveOil',
                     'OSULeaf', 'PhalangesOutlinesCorrect', 'Phoneme', 'Plane', 'ProximalPhalanxOutlineAgeGroup',
                     'ProximalPhalanxOutlineCorrect', 'ProximalPhalanxTW', 'RefrigerationDevices',
                     'ScreenType', 'ShapeletSim', 'ShapesAll', 'SmallKitchenAppliances', 'SonyAIBORobotSurface',
                     'SonyAIBORobotSurfaceII', 'StarLightCurves', 'Strawberry', 'SwedishLeaf', 'Symbols',
                     'synthetic_control', 'ToeSegmentation1', 'ToeSegmentation2', 'Trace', 'TwoLeadECG', 'Two_Patterns',
                     'UWaveGestureLibraryAll', 'uWaveGestureLibrary_X', 'uWaveGestureLibrary_Y',
                     'uWaveGestureLibrary_Z', 'wafer', 'Wine', 'WordsSynonyms', 'Worms', 'WormsTwoClass', 'yoga']

    # dataset_names = ['Coffee']

    # epsilons = np.arange(start=0.0,stop=2.0,step=0.025,dtype=np.float32)
    epsilons = [0.1]

    for dataset_name in dataset_names:
        for ep in epsilons:

                           eps = ep)

# if __name__ == '__main__':
#     flags.DEFINE_integer('nb_epochs', NB_EPOCHS,
#                          'Number of epochs to train model')
#     flags.DEFINE_integer('batch_size', BATCH_SIZE, 'Size of training batches')
#     flags.DEFINE_float('learning_rate', LEARNING_RATE,
#                        'Learning rate for training')
#     flags.DEFINE_string('train_dir', TRAIN_DIR,
#                         'Directory where to save model.')
#     flags.DEFINE_string('filename', FILENAME, 'Checkpoint filename.')
#     flags.DEFINE_boolean('load_model', LOAD_MODEL,
#                          'Load saved model or train.')
Hello, can you try to see if the code runs without the errors mentioned in the other issues ? if it does work can you do a PR for it ? Thanks

@GeYuYao-hub Thanks for your comments. There are one another minor comments on your instruction:
The numpy 1.24.2 conflicts with other requirements
" The user requested numpy==1.24.2
pandas 1.3.5 depends on numpy>=1.17.3; platform_machine != "aarch64" and platform_machine != "arm64" and python_version < "3.10"
scikit-learn 1.0.2 depends on numpy>=1.14.6
scipy 1.7.3 depends on numpy<1.23.0 and >=1.16.5

Therefor pip-requirement.txt could be like:

