Skip to content

Commit

Permalink
Mqtt fixes (#222)
Browse files Browse the repository at this point in the history
This includes fixes to running the MQTT container with docker for a
distributed simulation.

For distributed simulations, one can start
`python examples/distributed_world_manager.py` in one terminal and run
`python examples/distributed_world_agent.py` in another one.

A registry of which agents are available (and must be online before
simulation can start) and a central yaml config are still needed.
So this is an early fix set, which should not change anything to
existing behavior

---------

Co-authored-by: Nick Harder <56074305+nick-harder@users.noreply.github.com>
  • Loading branch information
maurerle and nick-harder authored Oct 31, 2023
1 parent 606c335 commit e537ad6
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 168 deletions.
2 changes: 2 additions & 0 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,10 @@ def get_sum_reward(self):
f"select reward FROM rl_params where simulation='{self.simulation_id}'"
)

avg_reward = 0
with self.db.begin() as db:
reward = db.execute(query).fetchall()
if len(reward):
avg_reward = sum(r[0] for r in reward) / len(reward)

return avg_reward
5 changes: 4 additions & 1 deletion assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,10 @@ async def clear_market(self, market_products: list[MarketProduct]):

for meta in market_meta:
logger.debug(
f'clearing price for {self.marketconfig.name} is {meta["price"]:.2f}, volume: {meta["demand_volume"]}'
"clearing price for %s is %.2f, volume: %f",
self.marketconfig.name,
meta["price"],
meta["demand_volume"],
)
meta["market_id"] = self.marketconfig.name
meta["time"] = meta["product_start"]
Expand Down
19 changes: 9 additions & 10 deletions assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ async def setup(
learning_config: LearningConfig = {},
forecaster: Forecaster = None,
manager_address=None,
**kwargs,
):
self.clock = ExternalClock(0)
self.start = start
Expand Down Expand Up @@ -140,6 +141,7 @@ async def setup(
"broker_addr": "localhost",
"client_id": self.addr,
}
container_kwargs["mqtt_kwargs"].update(**kwargs)

self.container = await create_container(
connection_type=connection_type,
Expand All @@ -150,18 +152,15 @@ async def setup(
)
self.learning_mode = self.learning_config.get("learning_mode", False)
self.output_agent_addr = (self.addr, "export_agent_1")
if self.distributed_role is True:
if self.distributed_role is False:
self.clock_agent = DistributedClockAgent(self.container)
self.output_agent_addr = (manager_address, "export_agent_1")
else:
await self.setup_learning()
await self.setup_output_agent(simulation_id, save_frequency_hours)
self.clock_manager = DistributedClockManager(
self.container, receiver_clock_addresses=self.addresses
)
elif self.distributed_role is None:
await self.setup_learning()
await self.setup_output_agent(simulation_id, save_frequency_hours)
else:
self.clock_agent = DistributedClockAgent(self.container)
self.output_agent_addr = (manager_address, "export_agent_1")

async def setup_learning(self):
self.bidding_params.update(self.learning_config)
Expand Down Expand Up @@ -198,7 +197,7 @@ async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int

# mango multiprocessing is currently only supported on linux
# with single
if platform == "linux" and self.distributed_role is None:
if platform == "linux":
self.addresses.append(self.addr)

def creator(container):
Expand Down Expand Up @@ -380,7 +379,7 @@ def add_market(
self.markets[f"{market_config.name}"] = market_config

async def _step(self):
if self.distributed_role:
if self.distributed_role is not False:
next_activity = await self.clock_manager.distribute_time()
else:
next_activity = self.clock.get_next_activity()
Expand All @@ -404,7 +403,7 @@ async def async_run(self, start_ts, end_ts):

# allow registration before first opening
self.clock.set_time(start_ts - 1)
if self.distributed_role:
if self.distributed_role is not False:
await self.clock_manager.broadcast(self.clock.time)
while self.clock.time < end_ts:
await asyncio.sleep(0)
Expand Down
8 changes: 8 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ services:
profiles: ["mqtt"]
ports:
- "1883:1883/tcp"
volumes:
- ./docker_configs/mqtt.conf:/mosquitto/config/mosquitto.conf
healthcheck:
test: "mosquitto_sub -t '$$SYS/#' -C 1 | grep -v Error || exit 1"
interval: 45s
Expand All @@ -90,13 +92,19 @@ services:
depends_on:
- assume_db
- mqtt-broker
environment:
DB_URI: "postgresql://assume:assume@assume_db:5432/assume"
MQTT_BROKER: mqtt-broker
entrypoint: python3 ./examples/distributed_world_manager.py

simulation_client01:
container_name: simulation_client01
image: ghcr.io/assume-framework/assume:latest
profiles: ["mqtt"]
build: .
environment:
DB_URI: "postgresql://assume:assume@assume_db:5432/assume"
MQTT_BROKER: mqtt-broker
depends_on:
- assume_db
- mqtt-broker
Expand Down
5 changes: 5 additions & 0 deletions docker_configs/mqtt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# https://github.com/eclipse/mosquitto/blob/master/mosquitto.conf
listener 1883
allow_anonymous true

max_keepalive 3600
84 changes: 84 additions & 0 deletions examples/distributed_simulation/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import calendar
import logging
import os
from datetime import datetime, timedelta

import pandas as pd
from dateutil import rrule as rr

from assume import World
from assume.common.market_objects import MarketConfig, MarketProduct

log = logging.getLogger(__name__)


db_uri = os.getenv("DB_URI", "postgresql://assume:assume@localhost:5432/assume")

use_mqtt = False

if use_mqtt:
manager_addr = "manager"
agent_adress = "agent"
agent_adresses = ["agent"]
market_operator_addr = "manager"
else:
manager_addr = ("0.0.0.0", 9099)
agent_adress = ("0.0.0.0", 9098)
agent_adresses = [("0.0.0.0", 9098)]
market_operator_addr = ("0.0.0.0", 9099)

market_operator_aid = "market_operator"
broker_addr = os.getenv("MQTT_BROKER", ("0.0.0.0", 1883, 600))

start = datetime(2023, 10, 4)
end = datetime(2023, 12, 5)
index = pd.date_range(
start=start,
end=end + timedelta(hours=24),
freq="H",
)
sim_id = "handmade_simulation"

marketdesign = [
MarketConfig(
"EOM",
rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end),
timedelta(hours=1),
"pay_as_clear",
[MarketProduct(timedelta(hours=1), 24, timedelta(hours=1))],
additional_fields=["block_id", "link", "exclusive_id"],
)
]


async def worker(world: World, marketdesign: list[MarketConfig], create_worker):
if world.distributed_role:
world.addresses.extend(agent_adresses)

await world.setup(
start=start,
end=end,
save_frequency_hours=48,
simulation_id=sim_id,
index=index,
manager_address=manager_addr,
broker_addr=broker_addr,
)

await create_worker(world, marketdesign)

await asyncio.sleep(0)

# wait until done if we are a worker agent
if world.distributed_role:
world.logger.info("sleeping 2s")
await asyncio.sleep(2)
world.logger.info("starting simulation")
await world.async_run(
start_ts=calendar.timegm(world.start.utctimetuple()),
end_ts=calendar.timegm(world.end.utctimetuple()),
)
elif world.distributed_role is False:
await world.clock_agent.stopped
await world.container.shutdown()
38 changes: 38 additions & 0 deletions examples/distributed_simulation/world_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from config import (
agent_adress,
db_uri,
index,
market_operator_addr,
market_operator_aid,
marketdesign,
worker,
)

from assume import MarketConfig, World
from assume.common.forecasts import NaiveForecast


async def create_worker(world: World, marketdesign: list[MarketConfig]):
for market_config in marketdesign:
market_config.addr = market_operator_addr
market_config.aid = market_operator_aid
world.markets[f"{market_config.name}"] = market_config

world.add_unit_operator("my_demand")
world.add_unit(
"demand1",
"demand",
"my_demand",
# the unit_params have no hints
{
"min_power": 0,
"max_power": 1000,
"bidding_strategies": {"energy": "naive"},
"technology": "demand",
},
NaiveForecast(index, demand=100),
)


world = World(database_uri=db_uri, addr=agent_adress, distributed_role=False)
world.loop.run_until_complete(worker(world, marketdesign, create_worker))
40 changes: 40 additions & 0 deletions examples/distributed_simulation/world_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from config import (
agent_adress,
agent_adresses,
db_uri,
index,
manager_addr,
market_operator_addr,
market_operator_aid,
marketdesign,
worker,
)

from assume import MarketConfig, World
from assume.common.forecasts import NaiveForecast


async def create_worker(world: World, marketdesign: list[MarketConfig]):
world.add_market_operator(id=market_operator_aid)
for market_config in marketdesign:
world.add_market(market_operator_aid, market_config)

world.add_unit_operator("my_operator")

nuclear_forecast = NaiveForecast(index, availability=1, fuel_price=3, co2_price=0.1)
world.add_unit(
"nuclear1",
"power_plant",
"my_operator",
{
"min_power": 200,
"max_power": 1000,
"bidding_strategies": {"energy": "naive"},
"technology": "nuclear",
},
nuclear_forecast,
)


world = World(database_uri=db_uri, addr=manager_addr, distributed_role=True)
world.loop.run_until_complete(worker(world, marketdesign, create_worker))
78 changes: 0 additions & 78 deletions examples/distributed_world_agent.py

This file was deleted.

Loading

0 comments on commit e537ad6

Please sign in to comment.