diff --git a/server/server_account_test.go b/server/server_account_test.go index bb289a1a..3e14d51f 100644 --- a/server/server_account_test.go +++ b/server/server_account_test.go @@ -686,88 +686,6 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) { require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, m2.ID)) } -func TestAccount_Reservation_Add_Kills_Other_Subscribers(t *testing.T) { - t.Parallel() - conf := newTestConfigWithAuthFile(t) - conf.AuthDefault = user.PermissionReadWrite - conf.EnableSignup = true - s := newTestServer(t, conf) - defer s.closeDatabases() - - // Create user with tier - rr := request(t, s, "POST", "/v1/account", `{"username":"phil", "password":"mypass"}`, nil) - require.Equal(t, 200, rr.Code) - - require.Nil(t, s.userManager.AddTier(&user.Tier{ - Code: "pro", - MessageLimit: 20, - ReservationLimit: 2, - })) - require.Nil(t, s.userManager.ChangeTier("phil", "pro")) - - // Subscribe anonymously - anonCh, userCh := make(chan bool), make(chan bool) - go func() { - rr := request(t, s, "GET", "/mytopic/json", ``, nil) // This blocks until it's killed! - require.Equal(t, 200, rr.Code) - messages := toMessages(t, rr.Body.String()) - require.Equal(t, 2, len(messages)) // This is the meat. We should NOT receive the second message! - require.Equal(t, "open", messages[0].Event) - require.Equal(t, "message before reservation", messages[1].Message) - anonCh <- true - log.Info("Anonymous subscription ended") - }() - - // Subscribe with user - go func() { - rr := request(t, s, "GET", "/mytopic/json", ``, map[string]string{ // Blocks! - "Authorization": util.BasicAuth("phil", "mypass"), - }) - require.Equal(t, 200, rr.Code) - messages := toMessages(t, rr.Body.String()) - require.Equal(t, 3, len(messages)) - require.Equal(t, "open", messages[0].Event) - require.Equal(t, "message before reservation", messages[1].Message) - require.Equal(t, "message after reservation", messages[2].Message) - userCh <- true - log.Info("User subscription ended") - }() - - // Publish message (before reservation) - time.Sleep(2 * time.Second) // Wait for subscribers - rr = request(t, s, "POST", "/mytopic", "message before reservation", nil) - require.Equal(t, 200, rr.Code) - time.Sleep(2 * time.Second) // Wait for subscribers to receive message - - // Reserve a topic - rr = request(t, s, "POST", "/v1/account/reservation", `{"topic": "mytopic", "everyone":"deny-all"}`, map[string]string{ - "Authorization": util.BasicAuth("phil", "mypass"), - }) - require.Equal(t, 200, rr.Code) - - // Everyone but phil should be killed - select { - case <-anonCh: - case <-time.After(5 * time.Second): - t.Fatal("Waiting for anonymous subscription to be killed failed") - } - - // Publish a message - rr = request(t, s, "POST", "/mytopic", "message after reservation", map[string]string{ - "Authorization": util.BasicAuth("phil", "mypass"), - }) - require.Equal(t, 200, rr.Code) - - // Kill user Go routine - s.topics["mytopic"].CancelSubscribers("") - - select { - case <-userCh: - case <-time.After(5 * time.Second): - t.Fatal("Waiting for user subscription to be killed failed") - } -} - func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) { t.Parallel() conf := newTestConfigWithAuthFile(t) @@ -795,7 +713,7 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) { require.Equal(t, 200, rr.Code) // Wait for stats queue writer - time.Sleep(500 * time.Millisecond) + time.Sleep(600 * time.Millisecond) // Verify that message stats were persisted u, err := s.userManager.User("phil") @@ -818,7 +736,7 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) { require.Equal(t, 200, rr.Code) // Verify that message stats were persisted - time.Sleep(500 * time.Millisecond) + time.Sleep(600 * time.Millisecond) u, err = s.userManager.User("phil") require.Nil(t, err) require.Equal(t, int64(2), u.Stats.Messages) // v.EnqueueUserStats had run! @@ -830,5 +748,4 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) { require.Equal(t, 200, rr.Code) account, _ = util.UnmarshalJSON[apiAccountResponse](io.NopCloser(rr.Body)) require.Equal(t, int64(2), account.Stats.Messages) // Is not reset! - } diff --git a/server/topic_test.go b/server/topic_test.go new file mode 100644 index 00000000..cab2918a --- /dev/null +++ b/server/topic_test.go @@ -0,0 +1,30 @@ +package server + +import ( + "github.com/stretchr/testify/require" + "sync/atomic" + "testing" +) + +func TestTopic_CancelSubscribers(t *testing.T) { + t.Parallel() + + subFn := func(v *visitor, msg *message) error { + return nil + } + canceled1 := atomic.Bool{} + cancelFn1 := func() { + canceled1.Store(true) + } + canceled2 := atomic.Bool{} + cancelFn2 := func() { + canceled2.Store(true) + } + to := newTopic("mytopic") + to.Subscribe(subFn, "", cancelFn1) + to.Subscribe(subFn, "u_phil", cancelFn2) + + to.CancelSubscribers("u_phil") + require.True(t, canceled1.Load()) + require.False(t, canceled2.Load()) +}