From 3334d8486183efcd0e5e70bf067931059fe3c52e Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Wed, 22 Jun 2022 15:11:50 -0400 Subject: [PATCH] Fix another race, add test --- server/server.go | 5 ++++- server/server_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index e79118b3..579c9f99 100644 --- a/server/server.go +++ b/server/server.go @@ -1165,8 +1165,11 @@ func (s *Server) updateStatsAndPrune() { } // Print stats + s.mu.Lock() + messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors) + s.mu.Unlock() log.Info("Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)", - s.messages, messages, len(s.topics), subscribers, len(s.visitors), + messagesCount, messages, topicsCount, subscribers, visitorsCount, receivedMailTotal, receivedMailSuccess, receivedMailFailure, sentMailTotal, sentMailSuccess, sentMailFailure) } diff --git a/server/server_test.go b/server/server_test.go index 9fc9aa88..54fef13c 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "log" "math/rand" "net/http" "net/http/httptest" @@ -1409,6 +1410,41 @@ func TestServer_Visitor_XForwardedFor_Multiple(t *testing.T) { require.Equal(t, "234.5.2.1", v.ip) } +func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) { + count := 1000 + s := newTestServer(t, newTestConfig(t)) + + // Add lots of messages + log.Printf("Adding %d messages", count) + start := time.Now() + for i := 0; i < count; i++ { + require.Nil(t, s.messageCache.AddMessage(newDefaultMessage(fmt.Sprintf("topic%d", i), "some message"))) + } + log.Printf("Done: Adding %d messages; took %s", count, time.Since(start).Round(time.Millisecond)) + + // Update stats + statsChan := make(chan bool) + go func() { + log.Printf("Updating stats") + start := time.Now() + s.updateStatsAndPrune() + log.Printf("Done: Updating stats; took %s", time.Since(start).Round(time.Millisecond)) + statsChan <- true + }() + time.Sleep(50 * time.Millisecond) // Make sure it starts first + + // Publish message (during stats update) + log.Printf("Publishing message") + start = time.Now() + response := request(t, s, "PUT", "/mytopic", "some body", nil) + m := toMessage(t, response.Body.String()) + require.Equal(t, "some body", m.Message) + require.True(t, time.Since(start) < 500*time.Millisecond) + log.Printf("Done: Publishing message; took %s", time.Since(start).Round(time.Millisecond)) + + <-statsChan +} + func newTestConfig(t *testing.T) *Config { conf := NewConfig() conf.BaseURL = "http://127.0.0.1:12345"