This commit is contained in:
Karmanyaah Malhotra 2024-01-21 09:46:36 -08:00 committed by GitHub
commit 7d413a1785
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 104 additions and 49 deletions

View file

@ -117,9 +117,10 @@ var (
errHTTPBadRequestWebPushSubscriptionInvalid = &errHTTP{40038, http.StatusBadRequest, "invalid request: web push payload malformed", "", nil}
errHTTPBadRequestWebPushEndpointUnknown = &errHTTP{40039, http.StatusBadRequest, "invalid request: web push endpoint unknown", "", nil}
errHTTPBadRequestWebPushTopicCountTooHigh = &errHTTP{40040, http.StatusBadRequest, "invalid request: too many web push topic subscriptions", "", nil}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil}
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication", nil}
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication", nil}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil}
errHTTPNoSubscriberUnifiedPush = &errHTTP{40401, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil}
errHTTPConflictUserExists = &errHTTP{40901, http.StatusConflict, "conflict: user already exists", "", nil}
errHTTPConflictTopicReserved = &errHTTP{40902, http.StatusConflict, "conflict: access control entry for topic or topic pattern already exists", "", nil}
errHTTPConflictSubscriptionExists = &errHTTP{40903, http.StatusConflict, "conflict: topic subscription already exists", "", nil}
@ -142,5 +143,4 @@ var (
errHTTPInternalErrorInvalidPath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", "", nil}
errHTTPInternalErrorMissingBaseURL = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/", nil}
errHTTPInternalErrorWebPushUnableToPublish = &errHTTP{50004, http.StatusInternalServerError, "internal server error: unable to publish web push message", "", nil}
errHTTPInsufficientStorageUnifiedPush = &errHTTP{50701, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil}
)

View file

@ -32,8 +32,8 @@ func testCacheMessages(t *testing.T, c *messageCache) {
require.Nil(t, c.AddMessage(m2))
// Adding invalid
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example", false))) // These should not be added!
// mytopic: count
counts, err := c.MessageCounts()

View file

@ -744,10 +744,9 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e
}
if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
// UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
// the subscription as invalid if any 400-499 code (except 429/408) is returned.
// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
// Rate-Topics header). The 404 response might remove the push subscription from application servers,
// but the client should resubscribe them when sent the new_topic parameter.
return nil, errHTTPNoSubscriberUnifiedPush.With(t)
} else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
return nil, errHTTPTooManyRequestsLimitMessages.With(t)
} else if email != "" && !vrate.EmailAllowed() {
@ -848,18 +847,12 @@ func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *
if err != nil {
minc(metricMessagesPublishedFailure)
minc(metricMatrixPublishedFailure)
if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
topic, err := fromContext[*topic](r, contextTopic)
if err != nil {
return err
}
if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPNoSubscriberUnifiedPush.HTTPCode {
pushKey, err := fromContext[string](r, contextMatrixPushKey)
if err != nil {
return err
}
if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
return writeMatrixResponse(w, pushKey)
}
return writeMatrixResponse(w, pushKey)
}
return err
}
@ -1225,8 +1218,11 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
createdNewTopics := false
subscriberIDs := make([]int, 0)
for _, t := range topics {
createdNewTopics = createdNewTopics || t.NeverSubscribed()
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
}
defer func() {
@ -1234,7 +1230,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
if err := sub(v, newOpenMessage(topicsStr, createdNewTopics)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
@ -1374,8 +1370,11 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
}
return s.sendOldMessages(topics, since, scheduled, v, sub)
}
createdNewTopic := false
subscriberIDs := make([]int, 0)
for _, t := range topics {
createdNewTopic = createdNewTopic || t.NeverSubscribed()
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
}
defer func() {
@ -1383,7 +1382,8 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
if err := sub(v, newOpenMessage(topicsStr, createdNewTopic)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {

View file

@ -92,7 +92,7 @@ func TestToFirebaseMessage_Keepalive(t *testing.T) {
}
func TestToFirebaseMessage_Open(t *testing.T) {
m := newOpenMessage("mytopic")
m := newOpenMessage("mytopic", false)
fbm, err := toFirebaseMessage(m, nil)
require.Nil(t, err)
require.Equal(t, "mytopic", fbm.Topic)

View file

@ -8,7 +8,6 @@ import (
"io"
"net/http"
"strings"
"time"
)
// Matrix Push Gateway / UnifiedPush / ntfy integration:
@ -72,14 +71,6 @@ type matrixResponse struct {
Rejected []string `json:"rejected"`
}
const (
// matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter is the time after which a Matrix response
// will return an HTTP 200 with the push key (i.e. "rejected":["<pushkey>"]}), if no rate visitor has been set on
// the topic. Rejecting the push key will instruct the Matrix server to invalidate the pushkey and stop sending
// messages to it. This must be longer than topicExpungeAfter. See https://spec.matrix.org/v1.6/push-gateway-api/
matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter = 12 * time.Hour
)
// errMatrixPushkeyRejected represents an error when handing Matrix gateway messages
//
// If the push key is set, the app server will remove it and will never send messages using the same

View file

@ -136,7 +136,7 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, "", messages[0].Message)
require.Equal(t, "new_topic", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
@ -147,6 +147,56 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
require.Equal(t, "", messages[1].Title)
require.Equal(t, 0, messages[1].Priority)
require.Nil(t, messages[1].Tags)
// The next time subscribing to the same topic will not result in new_topic on open
rr = httptest.NewRecorder()
ctx, cancel = context.WithCancel(context.Background())
req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic/json", nil)
if err != nil {
t.Fatal(err)
}
go func() {
s.handle(rr, req)
doneChan <- true
}()
time.Sleep(300 * time.Millisecond)
cancel()
<-doneChan
messages = toMessages(t, rr.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, "", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
// Subscribing to any new topic again will result in new_topic being sent
rr = httptest.NewRecorder()
ctx, cancel = context.WithCancel(context.Background())
req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic,topic2/json", nil)
if err != nil {
t.Fatal(err)
}
go func() {
s.handle(rr, req)
doneChan <- true
}()
time.Sleep(300 * time.Millisecond)
cancel()
<-doneChan
messages = toMessages(t, rr.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic,topic2", messages[0].Topic)
require.Equal(t, "new_topic", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
}
func TestServer_PublishAndSubscribe(t *testing.T) {
@ -1456,8 +1506,8 @@ func TestServer_MatrixGateway_Push_Failure_NoSubscriber(t *testing.T) {
s := newTestServer(t, c)
notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`
response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
require.Equal(t, 507, response.Code)
require.Equal(t, 50701, toHTTPError(t, response.Body.String()).Code)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`+"\n", response.Body.String())
}
func TestServer_MatrixGateway_Push_Failure_NoSubscriber_After13Hours(t *testing.T) {
@ -1466,16 +1516,16 @@ func TestServer_MatrixGateway_Push_Failure_NoSubscriber_After13Hours(t *testing.
s := newTestServer(t, c)
notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`
// No success if no rate visitor set (this also creates the topic in memory)
// Simply reject if no rate visitor set (this creates the topic in memory)
response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
require.Equal(t, 507, response.Code)
require.Equal(t, 50701, toHTTPError(t, response.Body.String()).Code)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`, strings.TrimSpace(response.Body.String()))
require.Nil(t, s.topics["mytopic"].rateVisitor)
// Fake: This topic has been around for 13 hours without a rate visitor
s.topics["mytopic"].lastAccess = time.Now().Add(-13 * time.Hour)
// Same request should now return HTTP 200 with a rejected pushkey
// Same request should still return an HTTP 200 with a rejected pushkey
response = request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`, strings.TrimSpace(response.Body.String()))

View file

@ -19,11 +19,12 @@ const (
// topic represents a channel to which subscribers can subscribe, and publishers
// can publish a message
type topic struct {
ID string
subscribers map[int]*topicSubscriber
rateVisitor *visitor
lastAccess time.Time
mu sync.RWMutex
ID string
subscribers map[int]*topicSubscriber
rateVisitor *visitor
lastAccess time.Time
neverSubscribed bool
mu sync.RWMutex
}
type topicSubscriber struct {
@ -38,9 +39,10 @@ type subscriber func(v *visitor, msg *message) error
// newTopic creates a new topic
func newTopic(id string) *topic {
return &topic{
ID: id,
subscribers: make(map[int]*topicSubscriber),
lastAccess: time.Now(),
ID: id,
subscribers: make(map[int]*topicSubscriber),
lastAccess: time.Now(),
neverSubscribed: true,
}
}
@ -61,6 +63,7 @@ func (t *topic) Subscribe(s subscriber, userID string, cancel func()) (subscribe
cancel: cancel,
}
t.lastAccess = time.Now()
t.neverSubscribed = false
return subscriberID
}
@ -79,6 +82,12 @@ func (t *topic) LastAccess() time.Time {
return t.lastAccess
}
func (t *topic) NeverSubscribed() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.neverSubscribed
}
func (t *topic) SetRateVisitor(v *visitor) {
t.mu.Lock()
defer t.mu.Unlock()

View file

@ -13,10 +13,11 @@ import (
// List of possible events
const (
openEvent = "open"
keepaliveEvent = "keepalive"
messageEvent = "message"
pollRequestEvent = "poll_request"
openEvent = "open"
openCreatedNewTopic = "new_topic"
keepaliveEvent = "keepalive"
messageEvent = "message"
pollRequestEvent = "poll_request"
)
const (
@ -123,8 +124,12 @@ func newMessage(event, topic, msg string) *message {
}
// newOpenMessage is a convenience method to create an open message
func newOpenMessage(topic string) *message {
return newMessage(openEvent, topic, "")
func newOpenMessage(topic string, createdNewTopics bool) *message {
msg := ""
if createdNewTopics { // can expand this to a comma seperated string for more future parameters
msg = openCreatedNewTopic
}
return newMessage(openEvent, topic, msg)
}
// newKeepaliveMessage is a convenience method to create a keepalive message