Data Analysis/prj HAIContest2021

HAICon 2021 - prj(full)

BS Ryu 2021. 9. 18. 22:13

대회링크

 

HAICon2021 산업제어시스템 보안위협 탐지 AI 경진대회 - DACON

[대회명] 산업제어시스템 보안위협 탐지 AI 경진대회 [주제] 산업제어시스템 보안위협 탐지 [배경] 최근 국가기반시설 및 산업시설의 제어시스템에 대한 사이버 보안위협이 지속적으로 증가하

dacon.io

우선 아직 진행중인 Contest이긴하지만, 성적이 잘 나오지 않으며 스터디 세미나를 하고있기때문에, 뒤로한채 블로깅을 시작한다. 8/30부로 시작한 프로젝트이고 전체적인 흐름에 이해한 바를 설명하겠다.

 

Baseline Code를 기반으로 대부분 설정했있음을 미리 고지한다.

 

# 시계열 데이터 # RNN # GRU

 

from google.colab import drive
drive.mount('/content/gdrive/')

# 최종 Evaluate Package 설치
!pip install ./gdrive/MyDrive/멋사_study/HAICon2021/HAICon2021_dataset/eTaPR-21.8.2-py3-none-any.whl

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import datetime
import time
import torch
from torch.utils.data import Dataset, DataLoader

import os
import pickle
import sys
from pathlib import Path
from datetime import timedelta
import dateutil
from tqdm.notebook import trange
from TaPR_pkg import etapr

우선 colab환경에서 진행했고, google drive에 Data및 평가 파일을 설치했다.

import 라인은 위와 같고, Pytorch를 main으로 활용했다.

참고로 TaPR_pkg는 링크에 전부 공개되어있다.

 

1. Data 불러오기

 

# Data Path Load
path_train = sorted([x for x in Path("./gdrive/MyDrive/멋사_study/HAICon2021/HAICon2021_dataset/train").glob("*.csv")]) 
path_test = sorted([x for x in Path("./gdrive/MyDrive/멋사_study/HAICon2021/HAICon2021_dataset/test").glob("*.csv")]) 
path_validation = sorted([x for x in Path("./gdrive/MyDrive/멋사_study/HAICon2021/HAICon2021_dataset/validation").glob("*.csv")]) 

def csv_to_df(target):
    return pd.read_csv(target)

def df_data_load(targets):
    return pd.concat([csv_to_df(x) for x in targets])

# Min-Max Regularization
def normalize(raw):
    ndf = raw.copy()
    for c in raw.columns:
        #min-max가 0인경우에는 무의미한 col으로 생각하고, 0으로 채워줌
        if raw_train[useful_cols].min()[c] == raw_train[useful_cols].max()[c]:
            ndf[c] = raw[c] - raw_train[useful_cols].min()[c]
        else:
            ndf[c] = (raw[c] - raw_train[useful_cols].min()[c]) / (raw_train[useful_cols].max()[c] - raw_train[useful_cols].min()[c])
    return ndf

# min_max reg를 진행하고, boundary인 [0,1] 이외의 값이 존재하는지 체크하는 함수
# train data에 대해서는, Reguralizaion 제대로 됐는지 체크
def boundary_check(df):
    x = np.array(df, dtype=np.float32)
    return np.any(x > 1.0), np.any(x < 0), np.any(np.isnan(x))

raw_train = df_data_load(path_train) # timestamp,C01~C86
raw_validation = df_data_load(path_validation) # timestamp,C01~C86, attack
raw_test = df_data_load(path_test) # timestamp,C01~C86

# time stamp는 attack event와 관련이 없는 관계로 삭제한다.
# time stamp 삭제는 normalize 시에 에러 발생하는 사유도 있다.

useful_cols = raw_train.columns.drop('timestamp')
drop_cols = ['C02','C09', 'C10', 'C18', 'C19', 'C22', 'C26', 'C29', 'C36', 'C38', 'C39', 'C49', 'C52', 'C55', 'C63', 'C69', 'C82', 'C85']
useful_cols = useful_cols.drop(drop_cols)


# drop_cols의 배경은 아래 주석과 같다.
## train, val, test data에서 min = max현상이 공통적으로 발생하는 feature는 제외하기로한다.

# train_drop_col_lst=[]
# for i in enumerate(raw_train[useful_cols].columns):
#   if raw_train[useful_cols][i[1]].max()-raw_train[useful_cols][i[1]].min() == 0:
#     train_drop_col_lst.append(i[1])

# validation_drop_col_lst=[]
# for i in enumerate(raw_validation[useful_cols].columns):
#   if raw_validation[useful_cols][i[1]].max()-raw_validation[useful_cols][i[1]].min() == 0:
#     validation_drop_col_lst.append(i[1])

# test_drop_col_lst=[]
# for i in enumerate(raw_test[useful_cols].columns):
#   if raw_test[useful_cols][i[1]].max()-raw_test[useful_cols][i[1]].min() == 0:
#     test_drop_col_lst.append(i[1])

# drop_cols = list(set(set(train_drop_col_lst)&set(validation_drop_col_lst)&set(test_drop_col_lst)))
# drop_cols.sort()

 

 

Data를 불러오는 과정은 pathlib의 Path 모듈을 사용했는데, folder내에 있는 파일에서 csv로 끝이나는 모든 파일을 불러오는 방식을 채택했다.

df_data_load 함수 내부에서 csv_to_df함수는 csv 파일을 불러오고, df_data_load 함수는 concat처리해주면서 Data 불러오기가 시작된다.

 

이후 Normalize가 진행되는데, Min-Max Regularization이지만, min과 max가 같은 feature의 경우는 배제해주었다. 향후에 drop cols에서 전부 제외처리를 해주게 된다. drop_cols하게되는 배경은 하단 주석에 내용이 나온다.

 

boundary_check 함수는, train을 기반으로 만들어진다. train data는 전부다 정상데이터만 존재하는 사유로, train data를 통과시키게 되면 (False,False,False)가 나온다. 각각 의미하는바는, normalize한 값이 0~1 외에 있는지와 null값이 존재하는지 이다.

 

# Exponential Weighted Function 통과
# 센서에서 발생하는 noise를 **smoothing** 시켜주기 위해 적용
df_train= normalize(raw_train[useful_cols]).ewm(alpha=0.9).mean()
df_validation = normalize(raw_validation[useful_cols])
df_test= normalize(raw_test[useful_cols]).ewm(alpha=0.9).mean()

이후에 EWM을 통과시킨다. Exponential Weighted Function을 통과시켜주는데, EWM은 지수평활이동평균이다.

간단하게만 설명을 해보자면, 이와 같다.

v(n+2) = (1-alpha) * ((1-alpha)*v(n)+alpha*theta(n+1)) + alpha * theta(n+2)

v(n) : ewm을 통과시킨 값; n번째 ewm값
theta(n) : 실제값 (smoothing 이전의 값)
alpha = 1- beta

n+2시점에 Exponential Smoothing한 값은 v(n) 즉, -2번째 시퀀스에 있는 예측값과, 직전 시퀀스에 있는 실제값 에 종속된다. 또한 현재 시점(n+2)시점 Data에도 역시 영향을 받는다. 

그 정도를 말하는것이 alpha이고, alpha가 1이면 실제 데이터와 완벽히 동일하고, 0에 가까워질수록 데이터는 보다 Smooth해진다.

그림이 좀 허접하긴한데, 목적자체는 이렇다. Anomaly state를 예측하는 모델을 만들건데, 지나치게 Noise가 많이 발생하는 수가 있다. 기본적으로 Anomaly state는 하나의 time stamp에서만 존재하지않고, 쭉 이어지게 된다. 이 두가지의 사유로 Smoothing을 진행하는게 시계열 데이터에서는 기본적으로 행해지는 편이다.

 

# normalize 완료된 df 불러오기
df_train = pd.read_pickle("/content/gdrive/MyDrive/멋사_study/HAICon2021/df_train.pickle")
df_validation = pd.read_pickle("/content/gdrive/MyDrive/멋사_study/HAICon2021/df_validation.pickle")
df_test = pd.read_pickle("/content/gdrive/MyDrive/멋사_study/HAICon2021/df_test.pickle")

print(boundary_check(df_train))
print(boundary_check(df_validation))
print(boundary_check(df_test))

# output
# (False, False, False)
# (True, True, False)
# (True, True, False)

암튼 이렇게 Normalize까지 끝나고, boundary_check 함수를 통과시키면 주석과 같은 output이 생긴다.

 

2. Data Pipeline 구축

WINDOW_SIZE = 60
WINDOW_GIVEN = WINDOW_SIZE-1


class HaiDataset(Dataset): # pytorch의 Dataset class 활용
    def __init__(self, timestamps, df, stride=1, attacks=None):
        self.ts = np.array(timestamps)
        self.tag_values = np.array(df, dtype=np.float32) 
        self.valid_idxs = []
        for L in trange(len(self.ts) - WINDOW_SIZE + 1): # L 1번째,  100번째 ts 9461번째
            R = L + WINDOW_SIZE - 1 # R 60번째, 159번째. 10000번째.
            if dateutil.parser.parse(self.ts[R]) - dateutil.parser.parse( # 60번째-1번째 = 59 . 이게 윈도우사이즈 크기와 동일한지 점검하는 조건
                self.ts[L]
            ) == timedelta(seconds=WINDOW_SIZE - 1):
                self.valid_idxs.append(L) # valid idx에 L시점의 ts를 쭉 저장. 1~9461 저장(리스트형태로)
        self.valid_idxs = np.array(self.valid_idxs, dtype=np.int32)[::stride] # valid idx array 형태로 바꿔주기
        self.n_idxs = len(self.valid_idxs) # n_idxs 는 갯수.
        print(f"# of valid windows: {self.n_idxs}")
        if attacks is not None:
            self.attacks = np.array(attacks, dtype=np.float32) # attack이 있으면 self.attacks함수에 Attack 표시나게하고
            self.with_attack = True # with_attack함수로 boolean 검증
        else:
            self.with_attack = False

    def __len__(self):
        return self.n_idxs # n_idxs WIndow에서 왼쪽 bounday가 될 수 있는 ts 갯수.

    def __getitem__(self, idx): ## slicing을 구현하기 위한 메소드
        i = self.valid_idxs[idx] # Left boundary
        last = i + WINDOW_SIZE - 1 # Right boundary
        item = {"attack": self.attacks[last]} if self.with_attack else {} # with_attack이 true면 item에 attack 표기 (right boundary기준)
        item["ts"] = self.ts[i + WINDOW_SIZE - 1]
        item["given"] = torch.from_numpy(self.tag_values[i : i + WINDOW_GIVEN]) # left boundary부터, right bounday까지, 60개의 window boundary안에있는 모든 데이터 값.
        item["answer"] = torch.from_numpy(self.tag_values[last]) # given의 마지막 row 는 그 이전 스텝의 answer의 값과 동일하다.
        return item

우선 HaiDataset이라는 class는 Pytorch의 Dataset을 기반으로 만들어졌다.

pytorch.utils.data에 Dataset을 참조해서 Class를 생성한다.

시계열 데이터를 학습할 것이므로, WINDOW를 구성해서 project를 진행했다.

전체 코드에대한 설명은 주석처리를 해놨다.

WINDOW_SIZE = 60 을 설정한 이유는, 장 짧은 Anomaly 지속기간이 60과 근접한 숫자이기 때문이었다.

예를들어, 1~100까지의 time stamp가 있다고 가정하면, 1~60이 첫번째 window가 되는것이고, 41~100이 마지막 window가 된다. window를 이동해나가면서 Data를 분류하게 되고, 그 기준은 right boundary의 이상 유무를 기반으로 해당 window의 정상/비정상이 결정된다.

 

3. Modeling

N_HIDDENS = 256 # perceptron수
N_LAYERS = 3
BATCH_SIZE = 2048
n_epochs = 50

class StackedGRU(torch.nn.Module):
    def __init__(self, n_tags):
        super().__init__() # torch.nn.Module에 담겨있는 함수를 사용하겠다.
        self.rnn = torch.nn.GRU(
            input_size=n_tags,
            hidden_size=N_HIDDENS,# perceptron수
            num_layers=N_LAYERS,
            bidirectional=True, #양방향으로 할지 말지 forward + backward 두번씩 하니까 feature가 두배로 변함
            dropout=0,
            # batch_first = True # batch_first : 미니 배치 차원을 맨 앞으로 하여 데이터를 불러올 것인지 여부. (False가 기본값)
        )
        
        self.fc = torch.nn.Linear(N_HIDDENS * 2, n_tags)
        self.relu = torch.nn.ReLU()# ------------------- Check


    def forward(self, x):
        x = x.transpose(0, 1)  # (batch, seq, params) -> (seq, batch, params)
        self.rnn.flatten_parameters()
        outs, _ = self.rnn(x)
        out = self.fc(self.relu(outs[-1]))# ------------------- Check
        # out = self.fc(outs[-1])
        return x[0] + out

Model은 torch.nn에 있는 GRU모델을 썼고, LSTM과 고려해가면서 진행했다. relu 함수를 Activation Function으로 활용했다. 뭐 이래저래 많이 돌려서 Hyper Params는 저렇다.

Batch_size는 다들 알다시피 클수록 빠른 학습속도를 제공하기때문에, 크게 키우긴했는데 stackoverflow나 최근 facebook의 저명한 연구자들이 제안하는건, batch size를 32~128로 유지하라고 하긴한다.

다만, 이 project에서는 256 이상을 추천한다.(성능도 속도도 훨씬 나음)

 

4. Train

directory = "/content/gdrive/MyDrive/멋사_study/HAICon2021/model/model_W{}_B{}_P{}_D{}_E{}".format(WINDOW_SIZE,BATCH_SIZE,N_HIDDENS,N_LAYERS,n_epochs)

def train(dataset, model, batch_size, n_epochs):
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) # Dataloader는 Mini Batch만들어 주는 역할. # 전체 데이터가 mini batch Size로 슬라이스됨. 
    optimizer = torch.optim.AdamW(model.parameters(), lr=0.001) # AdamW를 Optimizer로 선언
    loss_fn = torch.nn.L1Loss() #L1 Loss function 설정
    epochs = trange(n_epochs, desc="training")
    best = {"loss": sys.float_info.max} # float으로 설정할 수 있는 값중 최대값 지정
    loss_history = []
    lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, 0.997, verbose =False)
    for e in epochs:
        epoch_loss = 0
        for batch in dataloader:
            optimizer.zero_grad()
            given = batch["given"].cuda()
            guess = model(given)
            answer = batch["answer"].cuda()
            loss = loss_fn(answer, guess)
            loss.backward()
            epoch_loss += loss.item()
            optimizer.step()
        lr_scheduler.step()
        loss_history.append(epoch_loss)
        epochs.set_postfix_str(f"loss: {epoch_loss:.6f}")
        # val_loss = model.eval()
        if epoch_loss < best["loss"]:
            best["state"] = model.state_dict()
            best["loss"] = epoch_loss
            best["epoch"] = e + 1
        file_path = '/epoch{}_LOSS{:.6f}.pt'.format(e+1, epoch_loss)
        if not os.path.exists(directory):
            os.makedirs(directory)
        with open(directory+file_path, "wb") as f:
          torch.save({
              'epoch': e + 1,
              'model_state_dict': model.state_dict(),
              'optimizer_state_dict': optimizer.state_dict(),
              'loss': epoch_loss,
              "loss_history": loss_history
              }, f)
        print(f"epochs : {e+1:.0f}, loss : {epoch_loss:.4f}")
    return best, loss_history
    
# GPU 할당
MODEL = StackedGRU(n_tags=df_train.shape[1]) # input사이즈를 train data columns수로 하겠다는뜻
MODEL.cuda() # GPU 메모리 할당


# 학습
%%time
MODEL.train()
BEST_MODEL, LOSS_HISTORY = train(train_data, MODEL, BATCH_SIZE,n_epochs=n_epochs)

여기서는 torch.utils.data에 있는 Dataloader 모듈을 활용했다.

L1 loss fn선택한 이유는 inference과정과 관계가 있다. 

optimizer는 AdamW를 썼고, lr_scheduler를 Exponential LR로, 그리고 0.997의 곱할 값을 설정해놨다.

그리고, 매 학습마다 파일을 저장시키도록 했다. 어느 시점이 overfitting인지 검증하기 어려워서, 최대한 많이 돌리고 골라쓰는 무식한 방법을 채택했다. 

 

5. Inference

def inference(dataset, model, batch_size): # 추론
    dataloader = DataLoader(dataset, batch_size=batch_size)
    ts, dist, att = [], [], []
    with torch.no_grad():
        for batch in dataloader:
            given = batch["given"].cuda()
            answer = batch["answer"].cuda()
            guess = model(given)
            ts.append(np.array(batch["ts"]))
            dist.append(torch.abs(answer - guess).cpu().numpy()) # 예측값과 정답값의 차이를 기록
            try:
                att.append(np.array(batch["attack"]))
            except:
                att.append(np.zeros(batch_size))
            
    return (
        np.concatenate(ts),
        np.concatenate(dist),
        np.concatenate(att),
    )

given과 answer는 RNN모델을 생각하면 된다. ts는 time stamp를 저장하고, dist은 given을통해 예측한 값과, answer(데이터값)의 차이를 절대값으로 쭉쭉 더해준값이다. 마지막으로 att는 attack의 발생 유무를 말한다.

 

%%time
MODEL.eval() # nn.Module에서 train time과 eval time에서 수행하는 다른 작업을 수행할 수 있도록 switching 하는 함수라고 한다.
valid_CHECK_TS, valid_CHECK_DIST, valid_CHECK_ATT = inference(validation_data, MODEL, BATCH_SIZE)
# CHECK_DIST = 모든 시간대에서 전체 feature(field) 차이

valid_ANOMALY_SCORE = np.mean(valid_CHECK_DIST, axis=1)
# 그 차이의 평균을 스코어 지수로 표현
valid_ANOMALY_SCORE.mean()

딱히 설명할게 없을것같다. inference함수에 validation data를 넣고, 우리가 구한 모델과, 정해둔 Batch_size를 넣어준다.

그로부터 나오는 timestamp와, dist, attack 유무를 각각 의미한다.

 

def check_graph(xs, att, piece=2, THRESHOLD=None):# plot은 몇개 plot으로 그림 그려줄지 정하는거
    l = xs.shape[0]
    chunk = l // piece
    fig, axs = plt.subplots(piece, figsize=(20, 4 * piece))
    for i in range(piece):
        L = i * chunk
        R = min(L + chunk, l)
        xticks = range(L, R)
        axs[i].plot(xticks, xs[L:R])
        if len(xs[L:R]) > 0:
            peak = max(xs[L:R])
            axs[i].plot(xticks, att[L:R] * peak * 0.3)
        if THRESHOLD!=None:
            axs[i].axhline(y=THRESHOLD, color='r')
    plt.show()

그래프를 보면서 이야기를 하자

노란그래프가 validation data(얘만 유일하게 attack label이 있음!) 의 attack 유무를 표현해준거고, 우리가 train data를 통해 예측한 time stamp마다의 그림은 파란선이다. 그리고 Threshold는 빨간선으로 나오고 있다.

빨간선 위를 우리의 모델은 attack이라고 판단하니까 이를 for문을 통해서 찾아갔다.(이제 생각해보면, 최적화기법을 그냥 쓸걸그랬다.) 우리는 가장 적절한 Threshold를 찾아서 attack의 유무를 판정할꺼기 때문이다.

 

#정상은 0 비정상 1
def fill_blank(check_ts, labels, total_ts):
    def ts_generator():
        for t in total_ts:
            yield dateutil.parser.parse(t)

    def label_generator():
        for t, label in zip(check_ts, labels):
            yield dateutil.parser.parse(t), label

    g_ts = ts_generator()
    g_label = label_generator()
    final_labels = []

    try:
        current = next(g_ts)
        ts_label, label = next(g_label)
        while True:
            if current > ts_label:
                ts_label, label = next(g_label)
                continue
            elif current < ts_label:
                final_labels.append(0)
                current = next(g_ts)
                continue
            final_labels.append(label)
            current = next(g_ts)
            ts_label, label = next(g_label)
    except StopIteration:
        return np.array(final_labels, dtype=np.int8)

fill_blank 함수를 만들고,

 

LABELS = put_labels(valid_ANOMALY_SCORE, THRESHOLD)
ATTACK_LABELS = put_labels(np.array(raw_validation['attack']), threshold=THRESHOLD)


FINAL_LABELS = fill_blank(valid_CHECK_TS, LABELS, np.array(raw_validation['timestamp'])) # Labeling 진행
FINAL_LABELS.shape

Labeling을 진행했다. 

평가는 eTaPR 을 기준으로 진행됐고, 간단하게 Precision과 Recall값의 Trade-off속에서 조화평균의 max를 최종 모델로 선정하게된다.

 

TaPR = etapr.evaluate_haicon(anomalies=ATTACK_LABELS, predictions=FINAL_LABELS)
print(f"F1: {TaPR['f1']:.4f} (TaP: {TaPR['TaP']:.4f}, TaR: {TaPR['TaR']:.4f}),Threshold : {THRESHOLD:.5f}")
print(f"# of detected anomalies: {len(TaPR['Detected_Anomalies'])}")
print(f"Detected anomalies: {TaPR['Detected_Anomalies']}") # f1 score로 점수 받기

뭐 이런식으로  점수를 받아냈다. (eTaPR에 대한 질문보다는 설명이 되어있다. 맨위에 링크를 다시 가볼것을 권장함)

 

%%time
MODEL.eval()
CHECK_TS, CHECK_DIST, CHECK_ATT = inference(test_data, MODEL, BATCH_SIZE)

ANOMALY_SCORE = np.mean(CHECK_DIST, axis=1)

check_graph(ANOMALY_SCORE, CHECK_ATT, piece=8, THRESHOLD=THRESHOLD)

LABELS = put_labels(ANOMALY_SCORE, THRESHOLD)
LABELS, LABELS.shape

마지막으로, test data를 통과시키고 제출하게된다.(굳이 해당 코드는 올리지 않겠다.)

 

 

6. 후기

이론적 베이스가 있어야 할만한 프로젝트였다. 특히, Pytorch에 대한 활용까지도 처음이다보니까 Tensorflow때와 다른 불편함이 있었다. 

Hyper Parameter의 개선을 통해 좋은 모델을 만들어 내는 경진대회인지 아니면, 전처리/후처리를 기반으로 데이터의 품질을 높이는 경진대회인지 잘 모르겠다. 해봐야 가장 좋았던 성능이 f1 기준 0.36남짓이라서..ㅠㅠ

아무튼, 경진대회가 끝나면 수상코드를 많이 살펴봐야겠다. 어떤 이유로 성능이 안오르는지 너무 궁금해 미치겠다. (참고로 일주일 내내 팀원 4명이서 경우의수 될만한건 거의 돌려봤다.)

좋은성적보다는 이해를 하고, 해봤다는데 의의가 남는 프로젝트다.