diff --git a/server/topic.go b/server/topic.go index 881a28bc..fb0ecac4 100644 --- a/server/topic.go +++ b/server/topic.go @@ -28,6 +28,7 @@ func newTopic(id string, last time.Time) *topic { } } +// Subscribe subscribes to this topic func (t *topic) Subscribe(s subscriber) int { t.mu.Lock() defer t.mu.Unlock() @@ -37,24 +38,29 @@ func (t *topic) Subscribe(s subscriber) int { return subscriberID } +// Unsubscribe removes the subscription from the list of subscribers func (t *topic) Unsubscribe(id int) { t.mu.Lock() defer t.mu.Unlock() delete(t.subscribers, id) } +// Publish asynchronously publishes to all subscribers func (t *topic) Publish(m *message) error { - t.mu.Lock() - defer t.mu.Unlock() - t.last = time.Now() - for _, s := range t.subscribers { - if err := s(m); err != nil { - log.Printf("error publishing message to subscriber") + go func() { + t.mu.Lock() + defer t.mu.Unlock() + t.last = time.Now() + for _, s := range t.subscribers { + if err := s(m); err != nil { + log.Printf("error publishing message to subscriber") + } } - } + }() return nil } +// Subscribers returns the number of subscribers to this topic func (t *topic) Subscribers() int { t.mu.Lock() defer t.mu.Unlock()