""" Tests for SubagentBus pub/sub message bus. """ import threading import pytest from agent.subagent import SubagentBus, SubagentManager # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def bus(): return SubagentBus() # --------------------------------------------------------------------------- # Tests: SubagentBus # --------------------------------------------------------------------------- class TestSubagentBus: def test_subscribe_and_publish(self, bus): received = [] bus.subscribe("ch1", lambda msg, sender: received.append((msg, sender))) bus.publish("ch1", "hello", "agent-1") assert received == [("hello", "agent-1")] def test_multiple_subscribers(self, bus): received = [] bus.subscribe("ch1", lambda msg, sender: received.append(("a", msg))) bus.subscribe("ch1", lambda msg, sender: received.append(("b", msg))) bus.publish("ch1", "ping", "sender") assert len(received) == 2 assert ("a", "ping") in received assert ("b", "ping") in received def test_publish_to_empty_channel(self, bus): # Should not raise bus.publish("nonexistent", "msg", "sender") def test_unsubscribe_all(self, bus): received = [] bus.subscribe("ch1", lambda msg, sender: received.append(msg)) bus.unsubscribe_all("ch1") bus.publish("ch1", "hello", "sender") assert received == [] def test_unsubscribe_all_nonexistent_channel(self, bus): # Should not raise bus.unsubscribe_all("nonexistent") def test_channels_are_isolated(self, bus): received_a = [] received_b = [] bus.subscribe("a", lambda msg, sender: received_a.append(msg)) bus.subscribe("b", lambda msg, sender: received_b.append(msg)) bus.publish("a", "only-a", "sender") assert received_a == ["only-a"] assert received_b == [] def test_callback_error_does_not_stop_others(self, bus): received = [] def bad_cb(msg, sender): raise RuntimeError("boom") bus.subscribe("ch1", bad_cb) bus.subscribe("ch1", lambda msg, sender: received.append(msg)) bus.publish("ch1", "hello", "sender") # Second callback should still fire assert received == ["hello"] def test_thread_safety(self, bus): """Concurrent subscribes and publishes should not crash.""" results = [] barrier = threading.Barrier(4) def subscriber(): barrier.wait() bus.subscribe("stress", lambda msg, sender: results.append(msg)) def publisher(): barrier.wait() bus.publish("stress", "msg", "sender") threads = [ threading.Thread(target=subscriber), threading.Thread(target=subscriber), threading.Thread(target=publisher), threading.Thread(target=publisher), ] for t in threads: t.start() for t in threads: t.join(timeout=5) # --------------------------------------------------------------------------- # Tests: SubagentManager.bus property # --------------------------------------------------------------------------- class TestSubagentManagerBus: def test_manager_has_bus(self): mgr = SubagentManager( runtime_factory=lambda: None, send_fn=lambda *a: None, ) assert isinstance(mgr.bus, SubagentBus) def test_bus_is_same_instance(self): mgr = SubagentManager( runtime_factory=lambda: None, send_fn=lambda *a: None, ) assert mgr.bus is mgr.bus