import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Embedding, Bidirectional, LSTM
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from tensorflow.keras.losses import binary_crossentropy
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from keras.layers import Bidirectional, LSTM, Dense, Dropout
'''
# 레이블을 0과 1로 매핑
y_train = y_train.replace({'legitimate': 0, 'dga': 1})
y_test = y_test.replace({'legitimate': 0, 'dga': 1})
# LabelEncoder를 사용하여 레이블을 숫자로 인코딩
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)
# 인코딩된 레이블을 원-핫 인코딩
y_train_one_hot = to_categorical(y_train_encoded)
y_test_one_hot = to_categorical(y_test_encoded)
'''
# 학습 데이터 로드
data = pd.read_csv('dga_data.csv')
# 도메인을 숫자 시퀀스로 변환하는 함수
def domain_to_sequence(domain):
char_to_int = {char: i+1 for i, char in enumerate('abcdefghijklmnopqrstuvwxyz')}
sequence = [char_to_int.get(char, 0) for char in domain]
return sequence
# 도메인 데이터 전처리
sequences = [domain_to_sequence(domain) for domain in data['domain']]
padded_sequences = pad_sequences(sequences)
# 레이블 준비
labels = np.array(data['DGA'])
# 데이터셋 분할
X_train, X_test, y_train, y_test = train_test_split(padded_sequences, labels, test_size=0.2, random_state=42)
# 문자열 시퀀스의 최대 길이를 계산
max_length = max(data.apply(len))
# 시퀀스 패딩
data_padded = pad_sequences(data, maxlen=max_length)
model = Sequential()
model.add(Bidirectional(LSTM(128, return_sequences=True), input_shape=(max_length, 1)))
model.add(Dropout(0.2))
model.add(Bidirectional(LSTM(64)))
model.add(Dropout(0.2))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(np.expand_dims(X_train, axis=2), y_train, batch_size=32, epochs=10, validation_data=(np.expand_dims(X_test, axis=2), y_test))
'''
# 모델 생성
model = Sequential()
model.add(Embedding(input_dim=27, output_dim=32, input_length=padded_sequences.shape[1]))
model.add(Bidirectional(LSTM(64, return_sequences=True)))
model.add(Bidirectional(LSTM(32)))
model.add(Dense(1, activation='sigmoid'))
from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)
from tensorflow.keras.utils import to_categorical
y_train_one_hot = to_categorical(y_train_encoded)
y_test_one_hot = to_categorical(y_test_encoded)
# 모델 컴파일
model.compile(loss='tf.keras.losses.binary_crossentropy', optimizer='adam', metrics=['accuracy'])
#model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
#loss_fn = tf.keras.losses.BinaryCrossentropy()
#model.compile(loss=loss_fn, optimizer='adam', metrics=['accuracy'])
import tensorflow as tf
# 모델 컴파일 시에 손실 함수 등록
model.compile(optimizer='adam', loss=tf.keras.losses.binary_crossentropy)
# 모델 학습
#model.fit(X_train, y_train, batch_size=64, epochs=10, validation_data=(X_test, y_test),
#custom_objects={'binary_crossentropy': tf.keras.losses.binary_crossentropy})
import tensorflow as tf
# 모델 로드
#model = tf.keras.models.load_model('model.h5', custom_objects={'binary_crossentropy': tf.keras.losses.binary_crossentropy})
# 모델 학습
model.fit(X_train, y_train, batch_size=64, epochs=10, validation_data=(X_test, y_test))
# 모델 학습
#model.fit(X_train, y_train, batch_size=64, epochs=10, validation_data=(X_test, y_test))
'''
# 모델 저장
model.save('dga_classifier.h5')
```
# 판별 결과 출력
if is_dga:
print("DGA 도메인입니다.")
else:
print("DGA 도메인이 아닙니다.")
```
메모리가 딸리고, 버전이 안맞아서 파이썬 3.7로 다운하고 코드 간결화시켜보았다.
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.sequence import pad_sequences
# 학습된 모델 불러오기
model = keras.models.load_model('dga_classifier.h5')
# DGA 도메인 판별을 위한 문자열 길이 설정
max_sequence_length = 50
# 도메인을 숫자 시퀀스로 변환하는 함수
def domain_to_sequence(domain):
char_to_int = {char: i+1 for i, char in enumerate('abcdefghijklmnopqrstuvwxyz')}
sequence = [char_to_int.get(char, 0) for char in domain]
return sequence
# 도메인을 이진 분류하는 함수
def classify_domain(domain):
# 도메인을 숫자 시퀀스로 변환
sequence = domain_to_sequence(domain)
# 시퀀스 패딩
padded_sequence = pad_sequences([sequence], maxlen=max_sequence_length)
# 예측
prediction = model.predict(padded_sequence)
is_dga = np.argmax(prediction)
return is_dga
# 판별할 도메인
domain = "abcdefghijklmnopqrstuvwxyzabcdefghij.kz"
is_dga = classify_domain(domain)
# 판별 결과 출력
if is_dga:
print("DGA 도메인입니다.")
else:
print("DGA 도메인이 아닙니다.")
버전을 낮춰서 사용해봤으나 여전히 버전에러가 뜨는 상황.. 열심히 찾아보고 결과가 바뀌길..