mark: A photo of Mark kneeling on top of the Taal Volcano in the Philippines. It was a long hike. (Default)
Mark Smith ([staff profile] mark) wrote in [site community profile] changelog2012-03-13 07:59 am

[dw-free] Switch from syncitems to multiple getevents in entry imports

[commit: http://hg.dwscoalition.org/dw-free/rev/3c01c3ea5414]

Switch from syncitems to multiple getevents in entry imports

We were having some issues with syncitems failing on large accounts. Also,
honestly, it's really a finicky system and hard to use correctly. I decided
to rip it out and replace it with the a more straightforward approach.

Now, we use the selecttype of 'one' and the itemid of -1 to fetch the most
recent entry. Since itemids will be monotonically increasing, this gives us
an upper bound on the range. Since we also know that every account starts at
itemid 1, that gives us a lower bound.

From there it is a simple matter to start requesting entries in groups of
100. This is far more efficient -- for both us and the remote side -- than
using syncitems.

(The reason that we can do this is because we never go back and edit posts.
The strength of the syncitems mode is that it supports downloading edited
versions of the post.)

Patch by [staff profile] mark.

Files modified:
  • cgi-bin/DW/Worker/ContentImporter.pm
  • cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm
  • cgi-bin/DW/Worker/ContentImporter/Local/Entries.pm
--------------------------------------------------------------------------------
diff -r df2d8d5002f7 -r 3c01c3ea5414 cgi-bin/DW/Worker/ContentImporter.pm
--- a/cgi-bin/DW/Worker/ContentImporter.pm	Mon Mar 05 18:50:04 2012 +0800
+++ b/cgi-bin/DW/Worker/ContentImporter.pm	Tue Mar 13 08:01:14 2012 +0000
@@ -25,7 +25,6 @@
 use strict;
 use Time::HiRes qw/ sleep time /;
 use Carp qw/ croak confess /;
-use Encode qw/ encode_utf8 /;
 use Storable;
 use LWP::UserAgent;
 use XMLRPC::Lite;
diff -r df2d8d5002f7 -r 3c01c3ea5414 cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm
--- a/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm	Mon Mar 05 18:50:04 2012 +0800
+++ b/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm	Tue Mar 13 08:01:14 2012 +0000
@@ -107,49 +107,31 @@
     $log->( 'Loaded xpost map with %d entries.', scalar( keys %$xpost_map ) );
     $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 );
 
-    # this is a helper sub that steps a MySQL formatted time by some offset
-    # arguments: '2008-01-01 12:03:53', -1 ... returns '2008-01-01 12:03:52'
-    my $step_time = sub {
-        return LJ::mysql_time( LJ::mysqldate_to_time( $_[0] ) + $_[1] );
-    };
+    # get the itemid of the most recent entry (just so we know how many entries there have
+    # been in the life of this account)
+    $log->( 'Fetching the most recent entry.' );
+    my $last = $class->call_xmlrpc( $data, 'getevents',
+        {
+            ver         => 1,
+            selecttype  => 'one',
+            itemid      => -1,
+            lineendings => 'unix',
+        }
+    );
+    return $temp_fail->( 'XMLRPC failure: ' . $last->{faultString} )
+        if ! $last || $last->{fault};
+    return $temp_fail->( 'Failed to fetch the most recent entry.' )
+        unless ref $last->{events} eq 'ARRAY' && scalar @{$last->{events}} == 1;
 
-    # load the syncitems list; but never try to load the same lastsync time twice, just
-    # in case.  also, we have to do some pretty annoying back-steps and not actually trust
-    # the last synced time because it's possible in some rare cases to lose entries by
-    # just trusting what the remote end is telling you.  (FIXME: link to a writeup of this
-    # somewhere...)
-    my ( $lastsync, %tried_syncs, %sync );
-    while ( $tried_syncs{$lastsync} < 2 ) {
-        $log->( 'Calling syncitems; lastsync = %s.', ( $lastsync || 'undef' ) );
-        my $hash = $class->call_xmlrpc( $data, 'syncitems', { lastsync => $lastsync } );
-        return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
-            if ! $hash || $hash->{fault};
-
-        foreach my $item ( @{$hash->{syncitems} || []} ) {
-            # ensure we always step the sync time forward
-            my $synctime = $step_time->( $item->{time}, -1 );
-            $lastsync = $synctime
-                if !defined $lastsync || $synctime gt $lastsync;
-
-            # but only store it in %sync if it's an entry
-            $sync{$1} = [ $item->{action}, $synctime ]
-                if $item->{item} =~ /^L-(\d+)$/;
-        }
-
-        # now we can mark this, as we have officially syncd this time
-        $tried_syncs{$lastsync}++;
-
-        $title->( 'syncitems - %d left', $hash->{total} );
-        $log->( '    retrieved %d items and %d left to sync', $hash->{count}, $hash->{total} );
-        last if $hash->{count} == $hash->{total};
-    }
-    $log->( 'Syncitems finished with %d items pre-prune.', scalar( keys %sync ) );
-    $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 );
+    # extract the maximum jitemid from this event
+    my $maxid = $last->{events}->[0]->{itemid};
+    $log->( 'Discovered that the maximum jitemid on the remote is %d.', $maxid );
 
     # this is an optimization.  since we never do an edit event (only post!) we will
     # never get changes anyway.  so let's remove from the list of things to sync any
     # post that we already know about.  (not that we really care, but it's much nicer
     # on people we're pulling from.)
+    my %has; # jitemids we have
     foreach my $url ( keys %$entry_map ) {
 
         # but first, let's skip anything that isn't from the server we are importing
@@ -169,251 +151,152 @@
         # skip them here.
         next if $data->{options}->{lj_entries_remap_icon};
 
-        delete $sync{$1 >> 8};
+        # yes, we can has this jitemid from the remote side
+        $has{$1 >> 8} = 1;
     }
-    $log->( 'Syncitems now has %d items post-prune (first pass).', scalar( keys %sync ) );
+    $log->( 'Identified %d items we already know about (first pass).', scalar( keys %has ) );
     $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 );
 
     # this is another optimization.  we know crossposted entries can be removed from
     # the list of things we will import, as we generated them to begin with.
     foreach my $itemid ( keys %$xpost_map ) {
-        delete $sync{$itemid};
+        $has{$itemid} = 1;
     }
-    $log->( 'Syncitems now has %d items post-prune (second pass).', scalar( keys %sync ) );
+    $log->( 'Identified %d items we already know about (second pass).', scalar( keys %has ) );
     $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 );
 
     $title->( 'post-prune' );
 
-    # simple helper sub
-    my $realtime = sub {
-        my $id = shift;
-        return $sync{$id}->[1] if @{$sync{$id} || []};
+    # this is a useful helper sub we use
+    my $count = 0;
+    my $process_entry = sub {
+        my $evt = $_[0];
+        $evt->{key} = $evt->{url};
+        $count++;
+        $log->( '    %d %s %s; mapped = %d (import_source) || %d (xpost).',
+                $evt->{itemid}, $evt->{url}, $evt->{logtime}, $entry_map->{$evt->{key}},
+                $xpost_map->{$evt->{itemid}} );
+
+        # always set the picture_keyword property though, in case they're a paid
+        # user come back to fix their keywords.  this forcefully overwrites their
+        # local picture keyword
+        if ( my $jitemid = $entry_map->{$evt->{key}} ) {
+            my $entry = LJ::Entry->new( $u, jitemid => $jitemid );
+            my $kw = $evt->{props}->{picture_keyword};
+            if ( $u->userpic_have_mapid ) {
+                $entry->set_prop( picture_mapid => $u->get_mapid_from_keyword( $kw, create => 1) );
+            } else {
+                $entry->set_prop( picture_keyword => $kw );
+            }
+        }
+
+        # now try to skip it if we already have it
+        return if $entry_map->{$evt->{key}} || $xpost_map->{$evt->{itemid}} || $has{$evt->{itemid}};
+
+        # clean up event for LJ and remap friend groups
+        my @item_errors;
+        my $allowmask = $evt->{allowmask};
+        my $newmask = $class->remap_groupmask( $data, $allowmask );
+
+        # if we are unable to determine a good groupmask, then fall back to making
+        # the entry private and mark the error.
+        if ( $allowmask != 1 && $newmask == 1 ) {
+            $newmask = 0;
+            push @item_errors, "Could not determine groups to post to.";
+        }
+        $evt->{allowmask} = $newmask;
+
+        # now try to determine if we need to post this as a user
+        my $posteru;
+        if ( $data->{usejournal} ) {
+            my ( $posterid, $fid ) = $class->get_remapped_userids( $data, $evt->{poster} );
+
+            unless ( $posterid ) {
+                # FIXME: need a better way of totally dying...
+                push @item_errors, "Unable to map poster from LJ user '$evt->{poster}' to local user.";
+                $status->(
+                    remote_url => $evt->{url},
+                    errors     => \@item_errors,
+                );
+                return;
+            }
+
+            $posteru = LJ::load_userid( $posterid );
+        }
+
+        # we just link polls to the original site
+# FIXME: this URL should be from some method and not manually constructed
+        my $event = $evt->{event};
+        $event =~ s!<.+?-poll-(\d+?)>![<a href="http://www.$data->{hostname}/poll/?id=$1">Poll #$1</a>]!g;
+
+        if ( $event =~ m/<.+?-embed-.+?>/ ) {
+            $event =~ s/<.+?-embed-.+?>//g;
+
+            push @item_errors, "Entry contained an embed tag, please manually re-add the embedded content.";
+        }
+
+        if ( $event =~ m/<.+?-template-.+?>/ ) {
+            $event =~ s/<.+?-template-.+?>//g;
+
+            push @item_errors, "Entry contained a template tag, please manually re-add the templated content.";
+        }
+
+        $evt->{event} = $class->remap_lj_user( $data, $event );
+        $evt->{subject} = $class->remap_lj_user( $data, $evt->{subject} || "" );
+
+        # actually post it
+        my ( $ok, $res ) =
+            DW::Worker::ContentImporter::Local::Entries->post_event( $data, $entry_map, $u, $posteru, $evt, \@item_errors );
+
+        # we don't need this text anymore, so nuke it to try to save memory
+        delete $evt->{event};
+        delete $evt->{subject};
+
+        # now record any errors that happened
+        $status->(
+            remote_url => $evt->{url},
+            post_res   => $res,
+            errors     => \@item_errors,
+        ) if @item_errors;
+
+    };
+
+    # helper to load some events
+    my $fetch_events = sub {
+        $log->( 'Fetching %d items.', scalar @_ );
+        $title->( 'getevents - %d to %d', $_[0], $_[-1] );
+
+        # try to get it from the remote server
+        my $hash = $class->call_xmlrpc( $data, 'getevents',
+            {
+                ver         => 1,
+                itemids     => join( ',', @_ ),
+                selecttype  => 'multiple',
+                lineendings => 'unix',
+            }
+        );
+
+        # if we get an error, then we have to abort the import
+        return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
+            if ! $hash || $hash->{fault};
+
+        # good, import this event
+        $process_entry->( $_ )
+            foreach @{ $hash->{events} || [] };
     };
 
     # now get the actual events
-    while ( scalar( keys %sync ) > 0 ) {
-        my ( $count, $last_itemid ) = ( 0, undef );
-
-        # this is a useful helper sub we use in both import modes: lastsync and one
-        my $process_entry = sub {
-            my $evt = shift;
-
-            $count++;
-
-            $evt->{realtime} = $realtime->( $evt->{itemid} );
-            $evt->{key} = $evt->{url};
-
-            # skip this if we've already dealt with it before
-            $log->( '    %d %s %s; mapped = %d (import_source) || %d (xpost).',
-                    $evt->{itemid}, $evt->{url}, $evt->{realtime}, $entry_map->{$evt->{key}},
-                    $xpost_map->{$evt->{itemid}} );
-
-            # always set the picture_keyword property though, in case they're a paid
-            # user come back to fix their keywords.  this forcefully overwrites their
-            # local picture keyword
-            if ( my $jitemid = $entry_map->{$evt->{key}} ) {
-                my $entry = LJ::Entry->new( $u, jitemid => $jitemid );
-                my $kw = $evt->{props}->{picture_keyword};
-                if ( $u->userpic_have_mapid ) {
-                    $entry->set_prop( picture_mapid => $u->get_mapid_from_keyword( $kw, create => 1) );
-                } else {
-                    $entry->set_prop( picture_keyword => $kw );
-                }
-            }
-
-            # now try to skip it
-            my $sync = delete $sync{$evt->{itemid}};
-            return if $entry_map->{$evt->{key}} || !defined $sync || $xpost_map->{$evt->{itemid}};
-
-            # clean up event for LJ
-            my @item_errors;
-
-            # remap friend groups
-            my $allowmask = $evt->{allowmask};
-            my $newmask = $class->remap_groupmask( $data, $allowmask );
-
-            # if we are unable to determine a good groupmask, then fall back to making
-            # the entry private and mark the error.
-            if ( $allowmask != 1 && $newmask == 1 ) {
-                $newmask = 0;
-                push @item_errors, "Could not determine groups to post to.";
-            }
-            $evt->{allowmask} = $newmask;
-
-            # now try to determine if we need to post this as a user
-            my $posteru;
-            if ( $data->{usejournal} ) {
-                my ( $posterid, $fid ) = $class->get_remapped_userids( $data, $evt->{poster} );
-
-                unless ( $posterid ) {
-                    # FIXME: need a better way of totally dying...
-                    push @item_errors, "Unable to map poster from LJ user '$evt->{poster}' to local user.";
-                    $status->(
-                        remote_url => $evt->{url},
-                        errors     => \@item_errors,
-                    );
-                    return;
-                }
-
-                $posteru = LJ::load_userid( $posterid );
-            }
-
-            # we just link polls to the original site
-# FIXME: this URL should be from some method and not manually constructed
-            my $event = $evt->{event};
-            $event =~ s!<.+?-poll-(\d+?)>![<a href="http://www.$data->{hostname}/poll/?id=$1">Poll #$1</a>]!g;
-
-            if ( $event =~ m/<.+?-embed-.+?>/ ) {
-                $event =~ s/<.+?-embed-.+?>//g;
-
-                push @item_errors, "Entry contained an embed tag, please manually re-add the embedded content.";
-            }
-
-            if ( $event =~ m/<.+?-template-.+?>/ ) {
-                $event =~ s/<.+?-template-.+?>//g;
-
-                push @item_errors, "Entry contained a template tag, please manually re-add the templated content.";
-            }
-
-            $evt->{event} = $class->remap_lj_user( $data, $event );
-            $evt->{subject} = $class->remap_lj_user( $data, $evt->{subject} || "" );
-
-            # actually post it
-            my ( $ok, $res ) =
-                DW::Worker::ContentImporter::Local::Entries->post_event( $data, $entry_map, $u, $posteru, $evt, \@item_errors );
-
-            # we don't need this text anymore, so nuke it to try to save memory
-            delete $evt->{event};
-            delete $evt->{subject};
-
-            # now record any errors that happened
-            $status->(
-                remote_url => $evt->{url},
-                post_res   => $res,
-                errors     => \@item_errors,
-            ) if @item_errors;
-
-        };
-
-        # calculate what time to get entries for
-        my ( $tries, $lastgrab, $hash ) = ( 0, undef, undef );
-SYNC:   while ( $tries++ <= 10 ) {
-
-            # if we ever get in here with no entries left, we're done.  this sometimes happens
-            # when the manual advance import code hits the end of the sidewalk and runs out of
-            # things to import.
-            last unless keys %sync;
-
-            # calculate the oldest entry we haven't retrieved yet, and offset that time by
-            # $tries, so we can break the 'broken client' logic (note: we assert that we are
-            # not broken.)
-            my @keys = sort { $sync{$a}->[1] cmp $sync{$b}->[1] } keys %sync;
-            $last_itemid = $keys[0];
-            $lastgrab = $step_time->( $sync{$last_itemid}->[1], -$tries );
-
-            $title->( 'getevents - lastsync %s', $lastgrab );
-            $log->( 'Loading entries; lastsync = %s, itemid = %d.', $lastgrab, $keys[0] );
-            $hash = $class->call_xmlrpc( $data, 'getevents',
-                {
-                    ver         => 1,
-                    lastsync    => $lastgrab,
-                    selecttype  => 'syncitems',
-                    lineendings => 'unix',
-                }
-            );
-
-            # sometimes LJ doesn't like us on large imports or imports where the user has
-            # used the mass privacy tool in the past.  so, if we run into the 'you are broken'
-            # error, let's grab some older entries.  the goal here is mainly to advance
-            # the clock by at least 10 seconds.  this means we may end up downloading only
-            # one or two entries manually, but in some cases it means we may end up doing
-            # hundreds of entries.  depends on what kind of state the user is in.
-            if ( $hash && $hash->{fault} && $hash->{faultString} =~ /broken/ ) {
-                $log->( '    repeated requests error, falling back to manual advance.' );
-
-                # figure out which items we should try to get, based on the logic that we
-                # should get at least 10 seconds or 20 items, whichever is more.  the former
-                # for the case where mass privacy breaks us, the latter for the case where
-                # the Howard Predicament* bites us.
-                #
-                # * the Howard Predicament has only been observed in the wild once.  the
-                #   noted behavior is that, no matter what lastsync value we send, the
-                #   remote server chooses to tell us we're broken.  this should not be
-                #   possible, but it happened to my friend Howard, and I couldn't solve
-                #   it through any other means than bypassing lastsync.  sigh.
-                #
-
-                my $stop_after = $step_time->( $lastgrab, $tries + 10 );
-                $log->( 'Manual advance will stop at %s or 20 entries, whichever is more.', $stop_after );
-
-                my @keys = sort { $sync{$a}->[1] cmp $sync{$b}->[1] } keys %sync;
-                pop @keys
-                    while scalar( @keys ) > 20 && $sync{$keys[-1]}->[1] gt $stop_after;
-
-                $log->( 'Found %d entries to grab manually.', scalar( @keys ) );
-
-                # now get them, one at a time
-                my @events;
-                foreach my $itemid ( @keys ) {
-                    $log->( 'Fetching itemid %d.', $itemid );
-
-                    # try to get it from the remote server
-                    $hash = $class->call_xmlrpc( $data, 'getevents',
-                        {
-                            ver         => 1,
-                            itemid      => $itemid,
-                            selecttype  => 'one',
-                            lineendings => 'unix',
-                        }
-                    );
-
-                    # if we get an error, then we have to abort the import
-                    return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
-                        if ! $hash || $hash->{fault};
-
-                    # another observed oddity: sometimes the remote server swears that
-                    # an entry exists, but we can't get it.  even asking for it by name.
-                    # if there was no error, and no entry returned, skip it.
-                    if ( scalar( @{ $hash->{events} || [] } ) == 0 ) {
-                        $log->( 'Itemid %d seems to be a phantom.', $itemid );
-                        delete $sync{$itemid};
-                        next;
-                    }
-
-                    # good, import this event
-                    $process_entry->( $_ )
-                        foreach @{ $hash->{events} || [] };
-                }
-
-                next SYNC;
-            }
-
-            # bail if we get a different error
-            return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
-                if ! $hash || $hash->{fault};
-
-            # if we get here we're probably in good shape, bail out
-            last;
+    my @toload;
+    foreach my $jid ( 0..$maxid ) {
+        push @toload, $jid
+            unless exists $has{$jid} && $has{$jid};
+        if ( scalar @toload == 100 ) {
+            $fetch_events->( @toload );
+            @toload = ();
         }
-
-        # there is a slight chance we will get here if we run out of 'broken' retries
-        # so check for that
-        return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
-            if ! $hash || $hash->{fault};
-
-        # iterate over events and import them
-        $process_entry->( $_ )
-            foreach @{$hash->{events} || []};
-
-        # if we get here, we got a good result, which means that the entry we tried to get
-        # should be in the results.  if it's not, to prevent an infinite loop, let's mark
-        # it as retrieved.  FIXME: this causes problems with mass-edited journals
-        delete $sync{$last_itemid} if defined $last_itemid;
-
-        # log some status for later
-        $log->( '    counted %d entries, lastgrab is now %s.', $count, $lastgrab );
-        $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 );
     }
+    $fetch_events->( @toload )
+        if scalar @toload > 0;
 
     # mark the comments mode as ready to schedule
     my $dbh = LJ::get_db_writer();
@@ -427,5 +310,4 @@
     return $ok->();
 }
 
-
 1;
diff -r df2d8d5002f7 -r 3c01c3ea5414 cgi-bin/DW/Worker/ContentImporter/Local/Entries.pm
--- a/cgi-bin/DW/Worker/ContentImporter/Local/Entries.pm	Mon Mar 05 18:50:04 2012 +0800
+++ b/cgi-bin/DW/Worker/ContentImporter/Local/Entries.pm	Tue Mar 13 08:01:14 2012 +0000
@@ -19,6 +19,7 @@
 use strict;
 
 use Carp qw/ croak /;
+use Encode qw/ encode_utf8 /;
 
 =head1 NAME
 
@@ -96,8 +97,8 @@
 
     my %proto = (
         lineendings => 'unix',
-        subject => $evt->{subject},
-        event => $evt->{event},
+        subject => encode_utf8( $evt->{subject} ),
+        event => encode_utf8( $evt->{event} ),
         security => $evt->{security},
         allowmask => $evt->{allowmask},
 
@@ -118,6 +119,7 @@
         personifi_lang => 1,
         personifi_tags => 1,
         give_features => 1,
+        spam_counter => 1,
     );
     foreach my $prop ( keys %$props ) {
         next if $bad_props{$prop};
@@ -126,7 +128,7 @@
             or next;
         next if $p->{ownership} eq 'system';
 
-        $proto{"prop_$prop"} = $props->{$prop};
+        $proto{"prop_$prop"} = encode_utf8( $props->{$prop} );
     };
 
     # Overwrite these here in case we're importing from an imported journal (hey, it could happen)
@@ -156,7 +158,7 @@
 
     } else {
         $u->do( "UPDATE log2 SET logtime = ? where journalid = ? and jitemid = ?",
-                undef, $evt->{realtime}, $u->userid, $res{itemid} );
+                undef, $evt->{logtime}, $u->userid, $res{itemid} );
         $map->{$evt->{key}} = $res{itemid};
         return ( 1, \%res );
 
--------------------------------------------------------------------------------
fu: Close-up of Fu, bringing a scoop of water to her mouth (Default)

[personal profile] fu 2012-03-14 06:14 am (UTC)(link)
Excellent! I love how much cleaner this is now.
denise: Image: Me, facing away from camera, on top of the Castel Sant'Angelo in Rome (Default)

[staff profile] denise 2012-03-14 06:25 am (UTC)(link)
Aw, we've lost the Howard Predicament.