mark: A photo of Mark kneeling on top of the Taal Volcano in the Philippines. It was a long hike. (Default)
Mark Smith ([staff profile] mark) wrote in [site community profile] changelog2009-09-03 09:58 pm

[dw-free] Round 2 of PubSubHubbub (Fred) subscription support. This enables feeds to refer to multi

[commit: http://hg.dwscoalition.org/dw-free/rev/b2c0b0b8bf11]

Round 2 of PubSubHubbub (Fred) subscription support. This enables feeds to
refer to multiple URLs, this also makes us subscribe using the URL we got
the feed from as well as the self URL. Adds the framework to support ageing
out subscriptions and eventually killing them. Much more robust and
generally useful.

Patch by [staff profile] mark.

Files modified:
  • bin/upgrading/update-db-general.pl
  • bin/worker/subscribe-hubbub
  • cgi-bin/LJ/SynSuck.pm
  • cgi-bin/ljlib.pl
  • cgi-bin/parsefeed.pl
  • htdocs/misc/hubbub.bml
--------------------------------------------------------------------------------
diff -r b7370be6d607 -r b2c0b0b8bf11 bin/upgrading/update-db-general.pl
--- a/bin/upgrading/update-db-general.pl	Thu Sep 03 17:21:35 2009 +0000
+++ b/bin/upgrading/update-db-general.pl	Thu Sep 03 21:58:25 2009 +0000
@@ -974,6 +974,7 @@ register_tabledrop("adopt");
 register_tabledrop("adopt");
 register_tabledrop("adoptlast");
 register_tabledrop("urimap");
+register_tabledrop("syndicated_hubbub");
 
 register_tablecreate("portal", <<'EOC');
 CREATE TABLE portal (
@@ -1388,18 +1389,22 @@ CREATE TABLE syndicated (
 )
 EOC
 
-register_tablecreate("syndicated_hubbub", <<'EOC');
-CREATE TABLE syndicated_hubbub (
+register_tablecreate("syndicated_hubbub2", <<'EOC');
+CREATE TABLE syndicated_hubbub2 (
+    id INT UNSIGNED NOT NULL,
     userid INT UNSIGNED NOT NULL,
-    huburl VARCHAR(255),
-    topicurl VARCHAR(255),
+    huburl VARCHAR(255) NOT NULL,
+    topicurl VARCHAR(255) NOT NULL,
+    verifytoken VARCHAR(64) NOT NULL,
+    lastseenat INT UNSIGNED NOT NULL,
     leasegoodto INT UNSIGNED,
-    verifytoken VARCHAR(64),
 
+    UNIQUE (userid, huburl, topicurl),
     UNIQUE (verifytoken),
-    UNIQUE (topicurl),
+    INDEX (topicurl),
     INDEX (leasegoodto),
-    PRIMARY KEY (userid)
+    INDEX (lastseenat),
+    PRIMARY KEY (id)
 )
 EOC
 
diff -r b7370be6d607 -r b2c0b0b8bf11 bin/worker/subscribe-hubbub
--- a/bin/worker/subscribe-hubbub	Thu Sep 03 17:21:35 2009 +0000
+++ b/bin/worker/subscribe-hubbub	Thu Sep 03 21:58:25 2009 +0000
@@ -55,8 +55,8 @@ sub work {
 
     # find tasks that need doing that are owned by users that are visible
     my $rows = $dbh->selectall_arrayref(
-        q{SELECT s.userid, s.huburl, s.topicurl, s.verifytoken, s.leasegoodto
-          FROM user u, syndicated_hubbub s
+        q{SELECT s.id, s.userid, s.huburl, s.topicurl, s.verifytoken, s.leasegoodto
+          FROM user u, syndicated_hubbub2 s
           WHERE u.userid = s.userid AND u.statusvis = 'V'
             AND (s.leasegoodto IS NULL OR s.leasegoodto < (UNIX_TIMESTAMP() + 1800))
           LIMIT 500}
@@ -65,7 +65,7 @@ sub work {
 
     # iterate over tasks
     foreach my $row ( @$rows ) {
-        my ( $userid, $huburl, $topicurl, $verifytoken, $leasegoodto ) = @$row;
+        my ( $id, $userid, $huburl, $topicurl, $verifytoken, $leasegoodto ) = @$row;
         my $u = LJ::load_userid( $userid ) or next;
 
         print "[$$] Subscribing to " . $u->user . "(" . $u->id . ")\n";
@@ -85,12 +85,18 @@ sub work {
         # no matter what happens, bump the lease time artificially so
         # we won't retry every time this loop runs.  if the hub verifies
         # our subscription, the lease time will be set properly.
-        $dbh->do( 'UPDATE syndicated_hubbub SET leasegoodto = UNIX_TIMESTAMP() + ?',
-                  undef, int( rand() * 3600 ) + 3600 );  # 1-2 hours random
+        $dbh->do( 'UPDATE syndicated_hubbub2 SET leasegoodto = UNIX_TIMESTAMP() + ? WHERE id = ?',
+                  undef, int( rand() * 3600 ) + 3600, $id );  # 1-2 hours random
 
         # be verbose
         print "             == " . $res->code . " " . $res->message . "\n";
     }
+
+    # now the other half of life: removing stale subscriptions.  we keep track of every
+    # time we see a feed reference a (userid, huburl, topicurl) combination.  if one of those
+    # stops getting referenced for some period of time (24 hours?) then we should consider
+    # that subscription to be invalid.  (and in fact, we should unsubscribe.)
+    # FIXME: implement this ;)
 }
 
 
diff -r b7370be6d607 -r b2c0b0b8bf11 cgi-bin/LJ/SynSuck.pm
--- a/cgi-bin/LJ/SynSuck.pm	Thu Sep 03 17:21:35 2009 +0000
+++ b/cgi-bin/LJ/SynSuck.pm	Thu Sep 03 21:58:25 2009 +0000
@@ -158,9 +158,18 @@ sub process_content {
         return;
     }
 
-    # register feeds that can support hubbub
+    # register feeds that can support hubbub.
     if ( LJ::is_enabled( 'hubbub' ) && $feed->{self} && $feed->{hub} ) {
-        register_hubbub_feed( $userid, $feed->{self}, $feed->{hub} );
+        # this is a square operation.  register every "self" and the feed url along
+        # with the $synurl with every hub they say they talk to.  but for sanity,
+        # don't let them force us onto more than 10 subscriptions...
+        my $ct = 0;
+        foreach my $topic ( $synurl, @{ $feed->{self} } ) {
+            foreach my $hub ( @{ $feed->{hub} } ) {
+                last if $ct++ >= 10;
+                register_hubbub_feed( $userid, $topic, $hub );
+            }
+        }
     }
 
     # another sanity check
@@ -463,24 +472,29 @@ sub register_hubbub_feed {
     my $dbh = LJ::get_db_writer() or return;
     my $u = LJ::load_userid( $uid ) or return;
 
-    # if it exists, we need to know if we should be changing the record.  i.e., if the
-    # feed changes its topic url, or hub, then we want to update this record so that the
-    # subscription system picks up on it.
+    # since we can have many records for a given feed, we want to see if we have an exact
+    # match for this particular combination.
     my $row = $dbh->selectrow_hashref(
-        q{SELECT huburl, topicurl, leasegoodto, verifytoken
-          FROM syndicated_hubbub WHERE userid = ?},
-        undef, $u->id
+        q{SELECT id, huburl, topicurl, leasegoodto, verifytoken
+          FROM syndicated_hubbub2 WHERE userid = ? AND huburl = ? AND topicurl = ?},
+        undef, $u->id, $huburl, $topicurl
     );
     return if $dbh->errstr;
 
-    # bail now if things look good
-    return if $row && $row->{huburl} eq $huburl && $row->{topicurl} eq $topicurl;
+    # if we do, then we want to update this row with a new lastseenat so our collector
+    # doesn't purge this subscription later
+    if ( $row && $row->{id} ) {
+        $dbh->do( 'UPDATE syndicated_hubbub2 SET lastseenat = UNIX_TIMESTAMP() WHERE id = ?',
+                  undef, $row->{id} );
+        return;
+    }
 
-    # not the same, or the row doesn't exist, so just replace, we don't need the old data
+    # this row (exactly) doesn't exist, so register it
+    my $id = LJ::alloc_global_counter( 'F' ) or return;
     $dbh->do(
-        q{REPLACE INTO syndicated_hubbub (userid, huburl, topicurl, leasegoodto, verifytoken)
-          VALUES (?, ?, ?, ?, ?)},
-        undef, $u->id, $huburl, $topicurl, undef, LJ::rand_chars( 64 )
+        q{INSERT IGNORE INTO syndicated_hubbub2 (id, userid, huburl, topicurl, leasegoodto, verifytoken, lastseenat)
+          VALUES (?, ?, ?, ?, ?, ?, UNIX_TIMESTAMP())},
+        undef, $id, $u->id, $huburl, $topicurl, undef, LJ::rand_chars( 64 )
     );
 
     # no error checking for reasons above...
@@ -495,6 +509,10 @@ sub register_hubbub_feed {
 # post our subscriptions using HTTPS before we can do the "proper" way, though.
 sub process_hubbub_notification {
     my $bodyref = shift;
+
+    # needed for the future; get it early
+    my $sclient = LJ::theschwartz() or die;
+    my $dbr = LJ::get_db_reader() or die;
 
     # FIXME: this probably will explode horribly with aggregated content, so as
     # soon as that becomes supported somewhere, we need to fix this
@@ -510,34 +528,46 @@ sub process_hubbub_notification {
     }
 
     # worked, get self which should be the topic url
-    unless ( $feed->{self} ) {
-        warn "[$$] PubSubHubbub notification has no self link?\n";
+    unless ( $feed->{self} && ref $feed->{self} eq 'ARRAY' ) {
+        warn "[$$] PubSubHubbub notification has no self link(s)?\n";
         return;
     }
 
-    my $dbr = LJ::get_db_reader() or die;
-    my ( $uid ) = $dbr->selectrow_array(
-        'SELECT userid FROM syndicated_hubbub WHERE topicurl = ?',
-        undef, $feed->{self}
-    );
-    die if $dbr->err;
+    # now select everybody that uses this topic url.  note that this is actually kind of
+    # weird.  in theory we should only ever have one topicurl mapped to one feed, but if
+    # someone comes along and maps a bunch of topicurls to their feeds first, they can
+    # "break" hubbub subscriptions for popular feeds.  so we just make it so that we
+    # allow N feeds to have the same topic url and we schedule a refresh for all of them.
+    # a little more work for us and potentially more traffic, but it's better than the
+    # alternative...
+    my @uids;
+    foreach my $topic ( @{ $feed->{self} } ) {
+        my $uids = $dbr->selectcol_arrayref(
+            'SELECT userid FROM syndicated_hubbub2 WHERE topicurl = ?',
+            undef, $topic
+        );
+        die if $dbr->err;
 
-    # user must still be visible for us to care
-    my $u = LJ::load_userid( $uid ) or die;
-    return unless $u->is_visible;
+        push @uids, @{ $uids || [] };
+    }
 
-    # great! schedule a synsuck job :-)
-    my $sclient = LJ::theschwartz() or die;
-    $sclient->insert_jobs( TheSchwartz::Job->new(
-        funcname => 'DW::Worker::SynSuck',
-        arg      => { userid => $u->id },
-        uniqkey  => "synsuck:" . $u->id,
-        priority => 200, # arbitrary, high
-    ) );
+    # iterate over each user
+    foreach my $uid ( @uids ) {
+        # user must still be visible for us to care
+        my $u = LJ::load_userid( $uid ) or die;
+        next unless $u->is_visible;
 
-    # let devserver know
-    if ( $LJ::IS_DEV_SERVER ) {
-        warn "[$$] PubSubHubbub notification scheduled job for " . $u->user . "(" . $u->id . ").\n";
+        # great! schedule a synsuck job :-)
+        $sclient->insert_jobs( TheSchwartz::Job->new(
+            funcname => 'DW::Worker::SynSuck',
+            arg      => { userid => $u->id },
+            uniqkey  => "synsuck:" . $u->id,
+            priority => 200, # arbitrary, high, try to go fast
+        ) );
+
+        # let devserver know
+        warn "[$$] PubSubHubbub notification scheduled job for " . $u->user . "(" . $u->id . ").\n"
+            if $LJ::IS_DEV_SERVER;
     }
     return;
 }
diff -r b7370be6d607 -r b2c0b0b8bf11 cgi-bin/ljlib.pl
--- a/cgi-bin/ljlib.pl	Thu Sep 03 17:21:35 2009 +0000
+++ b/cgi-bin/ljlib.pl	Thu Sep 03 21:58:25 2009 +0000
@@ -2087,7 +2087,8 @@ sub get_secret
 # LJ-generic domains:
 #  $dom: 'S' == style, 'P' == userpic, 'A' == stock support answer
 #        'C' == captcha, 'E' == external user, 'O' == school
-#        'L' == poLL,  'M' == Messaging, 'H' == sHopping cart
+#        'L' == poLL,  'M' == Messaging, 'H' == sHopping cart,
+#        'F' == PubSubHubbub subscription id (F for Fred)
 #
 sub alloc_global_counter
 {
@@ -2097,7 +2098,7 @@ sub alloc_global_counter
 
     # $dom can come as a direct argument or as a string to be mapped via hook
     my $dom_unmod = $dom;
-    unless ($dom =~ /^[ESLPOAHCM]$/) {
+    unless ($dom =~ /^[ESLPOAHCMF]$/) {
         $dom = LJ::run_hook('map_global_counter_domain', $dom);
     }
     return LJ::errobj("InvalidParameters", params => { dom => $dom_unmod })->cond_throw
@@ -2137,6 +2138,8 @@ sub alloc_global_counter
         my $max_poll      = $dbh->selectrow_array("SELECT MAX(pollid) FROM poll");
         my $max_pollowner = $dbh->selectrow_array("SELECT MAX(pollid) FROM pollowner");
         $newmax = $max_poll > $max_pollowner ? $max_poll : $max_pollowner;
+    } elsif ( $dom eq 'F' ) {
+        $newmax = $dbh->selectrow_array( 'SELECT MAX(id) FROM syndicated_hubbub2' );
     } else {
         $newmax = LJ::run_hook('global_counter_init_value', $dom);
         die "No alloc_global_counter initalizer for domain '$dom'"
diff -r b7370be6d607 -r b2c0b0b8bf11 cgi-bin/parsefeed.pl
--- a/cgi-bin/parsefeed.pl	Thu Sep 03 17:21:35 2009 +0000
+++ b/cgi-bin/parsefeed.pl	Thu Sep 03 21:58:25 2009 +0000
@@ -280,10 +280,9 @@ sub StartTag {
                 return err( 'Feed not yet defined' )
                     unless $feed;
 
-                # we only take the first value, so if the user specifies
-                # multiple hubs (which is allowed) then we are going to just
-                # pick the first.  FIXME: subscribe to all?
-                $feed->{$_{rel}} ||= $_{href};
+                # allow these to be specified multiple times, the spec allows for multiple
+                # hubs.  the self link shouldn't allow multiples but it won't hurt if we let it.
+                push @{$feed->{$_{rel}} ||= []}, $_{href};
                 last TAGS;
             }
 
diff -r b7370be6d607 -r b2c0b0b8bf11 htdocs/misc/hubbub.bml
--- a/htdocs/misc/hubbub.bml	Thu Sep 03 17:21:35 2009 +0000
+++ b/htdocs/misc/hubbub.bml	Thu Sep 03 21:58:25 2009 +0000
@@ -71,23 +71,27 @@ _c?><?_code
 
     # validate the token matches something we know about
     my $dbh = LJ::get_db_writer() or return $fail->();
-    my ( $uid, $topicurldb )  = $dbh->selectrow_array(
-        'SELECT userid, topicurl FROM syndicated_hubbub WHERE verifytoken = ?',
+    my ( $id, $uid, $topicurldb )  = $dbh->selectrow_array(
+        'SELECT id, userid, topicurl FROM syndicated_hubbub2 WHERE verifytoken = ?',
         undef, $verifytoken
     );
     return $fail->( $dbh->errstr ) if $dbh->err;
 
     # must have a uid, and the topic url must match
-    return $err->( 'no matching row found [' . $verifytoken . ']' ) unless $uid;
-    return $err->( 'topic_url mismatch' ) unless $topicurl eq $topicurldb;
+    return $err->( 'no matching row found: ' . $verifytoken ) unless $uid;
+
+    # if the topic url doesn't match, then that's a terrible thing and we're not sure
+    # why, so log extra information
+    return $err->( 'topic_url mismatch: db=' . $topicurldb . ', hub=' . $topicurl )
+        unless $topicurl eq $topicurldb;
 
     # validate the user is still valid (didn't get suspended or anything)
     my $u = LJ::load_userid( $uid ) or return $fail->( 'failed to load user' );
     return $err->( 'user no longer visible' ) unless $u->is_visible;
     
     # okay, great; let's update our subscription record
-    $dbh->do( 'UPDATE syndicated_hubbub SET leasegoodto = UNIX_TIMESTAMP() + ?',
-              undef, $leasetime );
+    $dbh->do( 'UPDATE syndicated_hubbub2 SET leasegoodto = UNIX_TIMESTAMP() + ? WHERE id = ?',
+              undef, $leasetime, $id );
 
     # if we're a development server, warn some debugging so we know the hub hit us
     if ( $LJ::IS_DEV_SERVER ) {
--------------------------------------------------------------------------------

Post a comment in response:

This account has disabled anonymous posting.
If you don't have an account you can create one now.
HTML doesn't work in the subject.
More info about formatting

If you are unable to use this captcha for any reason, please contact us by email at support@dreamwidth.org