2025-02-03 23:25:07 +01:00

253 lines
6.0 KiB
Python

from os import getcwd, chdir
from time import sleep
from random import randint
from block2d import Block2d
from pyray import *
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from gymnasium.utils.env_checker import check_env
from stable_baselines3 import *
screen_width = 800
screen_height = 600
class BlockGameEnvironment(gym.Env):
metadata = {
"render_modes": ["human"],
"render_fps": 20,
}
def __init__(self, render_mode=None):
#super().__init__()
# Raylib
init_window(screen_width, screen_height, "Physics Simulation")
self.frame_counter = 0
self.content = Block2d()
# Action space
# with normalization
self.action_space = spaces.Box(
low=np.array([-1, -1, -1]),
high=np.array([1, 1, 1]),
dtype=np.float32
)
# without normalization
#self.action_space = spaces.Box(
# low=np.array([0, 0, 0]),
# high=np.array([screen_width, screen_width, screen_height]),
# dtype=np.float32
#)
# Observation space
self.observation_space = spaces.Box(
low=np.array([0] * 6),
high=np.array([
screen_width,
screen_height,
screen_width,
screen_height,
#self.content.max_speed,
#self.content.max_speed,
1,
1,
]),
dtype=np.float64
)
def reset(self, seed=None, options=None):
super().reset(seed=seed)
# Reconstruct the game
del self.content
self.box_start = (randint(0, screen_width), randint(0, screen_height))
self.goal_position = (randint(0, screen_width), randint(0, screen_height))
self.content = Block2d(
Vector2(*self.box_start),
Vector2(*self.goal_position),
)
obs = np.array([
self.content.position.x,
self.content.position.y,
self.content.goal_position.x,
self.content.goal_position.y,
#self.content.speed.x,
#self.content.speed.y,
1,
1,
])
return obs, {}
def step(self, action):
def calc_reward(is_done : bool) -> float:
# We have many strategies to reward the model,
# one worse than the other.
def keep_moving_large_proximity_big_earner():
if is_done: return 100
diff = (
abs(self.content.goal_position.x - action[0]),
abs(self.content.goal_position.y - action[1]),
)
if diff[0] < 100 and diff[1] < 100:
return 1 / (diff[0] + diff[1])
if self.content.speed.x == 0 and self.content.speed.y == 0:
return -0.001
return -0.0001
def precise_proximity_punisher():
r = 0
diffs = (
self.content.goal_position.x - self.content.position.x,
self.content.goal_position.y - self.content.position.y
)
diffs = (abs(diffs[0]), abs(diffs[1]))
speed_sum = self.content.speed.x + self.content.speed.y
r -= (diffs[0] + diffs[1]) / 1000
r -= speed_sum / 1200
return r
def take_it_slow_large_proximity_big_earner():
if is_done: return 100
elif (
self.content.speed.x < 100 and
self.content.speed.y < 100 and
abs(self.content.position.x - self.content.goal_position.x) < 50 and
abs(self.content.position.y - self.content.goal_position.y) < 50
): return 1
else: return -0.0001
def just_move_near_it_im_begging_you():
r = 0
diffs = (
self.content.goal_position.x - self.content.position.x,
self.content.goal_position.y - self.content.position.y
)
dist = np.sqrt(diffs[0]**2 + diffs[1]**2)
r = dist / 10_000
return -r
def just_move_near_it_im_begging_you_normalized():
if is_done: return 10
r = 0
max_dist = np.sqrt(screen_width**2 + screen_height**2)
diffs = (
self.content.goal_position.x - self.content.position.x,
self.content.goal_position.y - self.content.position.y
)
dist = np.sqrt(diffs[0]**2 + diffs[1]**2)
r = dist / max_dist
return -r
return just_move_near_it_im_begging_you_normalized()
# normalization
#should_move = action[0] > 0.8
click = Vector2(action[1], action[2])
click.x = ((click.x + 1) * screen_width) / 2
click.y = ((click.y + 1) * screen_height) / 2
should_move = True
#click = Vector2(action[1], action[2])
if should_move:
self.content.control(click)
self.content.update()
obs = np.array([
self.content.position.x,
self.content.position.y,
self.content.goal_position.x,
self.content.goal_position.y,
#self.content.speed.x,
#self.content.speed.y,
1,
1,
])
is_done = self.content.is_win_condition()
reward = calc_reward(is_done)
# printf debugging
#print(f"obs: {obs}")
#print(f"act: {action}")
#print(f"rew: {reward}")
return obs, reward, is_done, False, {}
def render(self, mode="human"):
begin_drawing()
clear_background(RAYWHITE)
self.content.display()
end_drawing()
if self.frame_counter < 400: take_screenshot(f"frame_{self.frame_counter:05d}.png")
else: sleep(0.01)
self.frame_counter += 1
def close(self):
close_window()
# Env init
gym.envs.registration.register(
id="BlockGameEnvironment-v0",
entry_point=__name__+":BlockGameEnvironment",
)
env = gym.make("BlockGameEnvironment-v0", render_mode="human")
# very useful check, however it clones the environment,
# which is bad in our case because raylib uses global data
#check_env(env.unwrapped)
# Model init
def get_mymodel(name : str, do_train : bool):
model_name = "custom_model_-_sorta_works"
model = None
match name:
case "myPPO":
model = PPO(
"MlpPolicy",
env,
learning_rate=0.001,
ent_coef=0.1,
batch_size=2, # NOTE: this is why we dont marge the two cases
verbose=1,
tensorboard_log="logs/",
)
case "myA2C":
model = A2C( # really, really fast to train
"MlpPolicy",
env,
verbose=1,
tensorboard_log="logs/",
)
if do_train:
for i in range(5000): model.learn(100)
model.save(model_name)
else:
match name:
case "myPPO":
model = PPO.load(model_name, env=env)
case "myA2C":
model = A2C.load(model_name, env=env)
return model
model = get_mymodel(
# "myPPO",
"myA2C",
# True,
False,
)
# Show what the model learned
while True:
obs, _ = env.reset()
for _ in range(1000):
action, _ = model.predict(obs)
obs, _, done, _, _ = env.step(action)
print(action)
env.render()
if done: break