이 코드를 이해하려면 앞선 policy iteration 포스트를 읽어주세요.
설명은 주석으로 해두었습니다.
Vπ(s) = E[Rt+1 + γVπ(St+1) | St = s]
아래의 과정을 코드로 표현한다.
- 처음에는 random policy(무작위 정책)로 정책을 설정한다.
- 모든 state에 대해서 정해진 업데이트 된 현재 policy를 통해 모든 state의 value값을 정하는 policy evaluation(정책 평가)를 한다.
- 모든 state의 value를 바탕으로 모든 state의 policy를 업데이트를 하는 policy improvement(정책 발전)을 한다.
- 2 ~ 3을 반복한다.
policy_iteration.py
5 x 5 grid world의 예제이다.
PolicyIeration 클래스의 초기화 부분을 보겠다
class PolicyIteration:
def __init__(self, env):
# 환경에 대한 객체 선언
self.env = env
# 가치함수를 2차원 리스트로 초기화 (25개)
self.value_table = [[0.0] * env.width for _ in range(env.height)]
# 상 하 좌 우 동일한 확률로 정책 초기화 (100개)
# 한 state 에 4가지 policy가 있기 때문
self.policy_table = [[[0.25, 0.25, 0.25, 0.25]] * env.width
for _ in range(env.height)]
# 마침 상태의 설정
# goal 인 지점(2, 2)
self.policy_table[2][2] = []
# 할인율
self.discount_factor = 0.9
policy_evaluation 함수에 대해서 보겠다.
# 벨만 기대 방정식을 통해 다음 가치함수를 계산하는 정책 평가
def policy_evaluation(self):
# 다음 가치함수 초기화
next_value_table = [[0.00] * self.env.width
for _ in range(self.env.height)]
# 모든 상태에 대해서 벨만 기대방정식을 계산
for state in self.env.get_all_states():
# 처음 value 는 일단 0으로 초기화
value = 0.0
# 마침 상태의 가치 함수 = 0
if state == [2, 2]:
next_value_table[state[0]][state[1]] = value
# 바로 다음 state로 간다
continue
# 벨만 기대 방정식
# 2, 2가 아닌 경우 현재 state에서 가능한 action 4가지를 모두 반복함
for action in self.env.possible_actions:
# 현재 반복중인 state에서 해당 action을 하면 다음 state는 무엇일지 next_state에 넣음
next_state = self.env.state_after_action(state, action)
# 현재 반복중인 state에서 해당 action을 하면 다음 state에 있는 reward가 얼마일지 reward에 넣음
reward = self.env.get_reward(state, action)
# next_state에 있는 현재 timestep의 value값을 next_value에다 넣음
next_value = self.get_value(next_state)
# 이게 벨만 기대 방정식, 이 반복문을 돌면서 각 action에 대한 q 값들을 모두 벨만 기대방정식에 의해 합해 주는 과정
value += (self.get_policy(state)[action] *
(reward + self.discount_factor * next_value))
# (state[0], state[1]) 지점의 state(현재 반복에서의 state)에 계산한 value를 next_value_table에넣어줌
next_value_table[state[0]][state[1]] = value
# 각 state에 해당되는 모든 next_value를 인스턴스 변수인 self.value_table에 업데이트를 시켜준다
self.value_table = next_value_table
policy_improvement 함수에 대해서 알아보겠다.
# 현재 가치 함수에 대해서 탐욕 정책 발전
def policy_improvement(self):
# 아까 __init__에서 초기화 해주었던 인스턴스 변수인 self.policy_table를 next_policy에다가 넣음
next_policy = self.policy_table
# 모든 state에 대해서 반복을 돌림
for state in self.env.get_all_states():
# 마침 상태가 현재 반복의 state라면 아무것도 안하고 continue를 통해 다음 state 로 반복을 이어간다.
if state == [2, 2]:
continue
value_list = []
# 반환할 정책 초기화
result = [0.0, 0.0, 0.0, 0.0]
# 모든 행동에 대해서 [보상 + (할인율 * 다음 상태 가치함수) = Qvalue] 계산
for index, action in enumerate(self.env.possible_actions):
# 현재 반복중인 state에서 해당 action을 하면 다음 state는 무엇일지 next_state에 넣음
next_state = self.env.state_after_action(state, action)
# 현재 반복중인 state에서 해당 action을 하면 다음 state에 있는 reward가 얼마일지 reward에 넣음
reward = self.env.get_reward(state, action)
# next_state에 있는 현재 timestep의 value값을 next_value에다 넣음
next_value = self.get_value(next_state)
# 이번엔 각 action에 대한 q value들을 value_list에다가 추가를 해준다.
value = reward + self.discount_factor * next_value
value_list.append(value)
# 받을 보상이 최대인 행동들에 대해 탐욕 정책 발전
# 지금 현재 반복중인 state에서의 모든 action에 대한 q값을 비교한후 가장 큰 값의 index를 max_idx_list에 넣음
max_idx_list = np.argwhere(value_list == np.amax(value_list))
max_idx_list = max_idx_list.flatten().tolist()
# 만약 그 q값들중 같은 값들이 있을때 확률이 같게 나누어 준다.
prob = 1 / len(max_idx_list)
# result에 각 action에 대한 policy를 넣어준다.
for idx in max_idx_list:
result[idx] = prob
# next_policy에다 현재 반복중인 state의 policy를 업데이트 해준다.
next_policy[state[0]][state[1]] = result
# 각 state에서 모든 action에 해당되는 next_policy를 인스턴스 변수인 self.policy_table에 업데이트를 시켜준다
self.policy_table = next_policy
제가 올린 글에서 잘못된 부분이 있으면 제 메일로 연락주세요!
Reference : https://github.com/rlcode/reinforcement-learning-kr-v2
이승수의 저작물인 이 저작물은(는) 크리에이티브 커먼즈 저작자표시-비영리-동일조건변경허락 4.0 국제 라이선스에 따라 이용할 수 있습니다.