Tensorflow와 numpy

Tensorflow와 numpy는 비슷하게 작동한다.

>>> import tensorflow as tf

>>> # 배열 생성
>>> t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

>>> # 인덱스 참조
>>> t[:, 1:]
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

>>> # 텐서 연산 
>>> t+10
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

>>> tf.square(t)
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

>>> t @ tf.transpose(t)
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

물론 모든 것이 numpy와 동일하지는 않다. 예를 들어, 전치 행렬의 경우, tensorflow에서는 tf.transpose(t), numpy에서는 t.T로 쓴다. 이는 tensorflow에서는 전치된 데이터의 복사본으로 새로운 텐서가 만들어지지만, 넘파이에서는 전치된 뷰일 뿐이기 때문이다.

텐서를 numpy 배열로, numpy 배열을 텐서로 바꾸는 것도 가능하다.

>>> import numpy as np
>>> a = np.array([2., 4., 5.])
>>> tf.constant(a)
<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

>>> t.numpy()
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

텐서플로는 타입 변환을 자동으로 수행하지 않는다. 즉, 다른 타입끼리 연산을 하려면 사용자가 tf.cast()로 직접 타입을 바꾸어야 한다.

t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

신경망에서는 역전파 결과에 따라 가중치를 수정해야 한다. 이를 위해서는 변경이 가능한 텐서인 tf.Variable을 사용해야 한다.

>>> v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])

>>> # tf.Variable 변형하기
>>> v.assign(2 * v)
>>> v[0, 1].assign(42)
>>> v[:, 2].assign([0., 1.])
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

assign을 이용해서 수정해야 하고, 인덱싱을 이용해 직접 수정하는 것은 지원하지 않는다.

이외에도 tensorflow는 희소 텐서, 문자열 텐서, 집합, 큐와 같은 다양한 데이터 타입을 지원한다.


사용자 정의 손실 함수

다음은 10장에서 살펴보았던 후버 손실을 구현한 것이다(사실 그냥 tf.keras.losses.Huber를 쓰면 된다.)

# huber loss
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

위 함수를 사용해서 모델을 훈련하고 저장하는 것은 문제가 발생하지 않는다. 하지만, 저장된 모델을 로드할 때는 함수 이름과 실제 함수를 매핑한 dictionary를 전달해야 한다(이름과 객체를 매핑해야 한다).

model = tf.keras.models.load_model('my_model_with_a_custom_loss.keras',
                                   custom_objects={'huber_fn': huber_fn})

만약 다른 기준이 필요해서 함수를 수정해야 한다면 다음과 같이 할 수 있다.

# when we need another criteria
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold ** 2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer='nadam')

이렇게 하면 모델을 저장할 때 threshold 값은 저장되지 않는다. 따라서 로드할 때 지정해주어야 한다.

model = tf.keras.models.load_model(
    "my_model_with_a_custom_loss_threshold_2.keras",
    custom_objects={"huber_fn": create_huber(2.0)})

이 문제를 해결하고 싶다면 클래스를 상속하고 get_config() 메서드를 구현하면 된다.

class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold ** 2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

model.compile(loss=HuberLoss(2.), optimizer='nadam')

모델을 로드할 때는 클래스 자체를 매핑해야 한다.


사용자 정의 활성화 함수, 초기화, 규제, 제한

다음과 같이 softplus, glorot 초기화, l1규제, 양수 가중치 제한을 직접 정의할 수 있다.

def my_softplus(z):
    return tf.math.log(1.0 + tf.exp(z))

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

모델과 함께 하이퍼파라미터를 저장하려면 앞서 사용자 정의 손실 함수처럼 클래스를 정의해야 한다.

class MyL1Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))

    def get_config(self):
        return {"factor": self.factor}


사용자 정의 지표

손실과 지표는 개념적으로 유사하다. 지표가 손실과 다른 점은 미분 가능하지 않거나 gradient가 0이어도 괜찮다는 점이다. 또한 지표는 이해하기 쉬워야 한다.

실제로 앞서 정의한 후버 손실을 그대로 지표로 사용해도 잘 작동한다.

model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

하지만 항상 이렇게 잘 작동하는 것은 아니다. 예를 들어, 이진 분류기 정밀도의 경우, 첫 번째 batch에서 80%(4/5), 두 번째 batch에서 0%(0/3)의 정밀도가 나왔을 때, 최종 정밀도는 둘의 평균인 40%가 아니라 $\frac{4+0}{5+3}=50\%$ 가 된다. 이를 해결하는 것이 Precision 클래스이다

>>> precision = tf.keras.metrics.Precision()
>>> precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])
>>> precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

# 현재 지표 값
>>> precision.result()
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>


# 진짜 양성, 거짓 양성 기록한 변수 확인
>>> precision.variables
[<KerasVariable shape=(1,), dtype=float32, path=precision/true_positives>,
 <KerasVariable shape=(1,), dtype=float32, path=precision/false_positives>]

위 정보는 batch마다 점진적으로 업데이트되기 때문에 스트리밍 지표(streaming metric)이라고 한다.

사용자 정의 스트리밍 지표는 다음과 같이 만들 수 있다.

class HuberMetric(tf.keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")      # 합 기록
        self.count = self.add_weight("count", initializer="zeros")      # 샘플 수 기록

    # batch 레이블과 예측을 바탕으로 변수 업데이트
    def update_state(self, y_true, y_pred, sample_weight=None):
        sample_metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(sample_metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    # 최종 결과 계산하고 반환
    def result(self):
        return self.total / self.count

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}


코드 보러가기



별도의 출처 표시가 있는 이미지를 제외한 모든 이미지는 강의자료에서 발췌하였음을 밝힙니다.

댓글남기기