mark: A photo of Mark kneeling on top of the Taal Volcano in the Philippines. It was a long hike. (Default)
Mark Smith ([staff profile] mark) wrote in [site community profile] changelog2009-03-02 09:12 pm

[dw-free] Allow importing of your journal from another LiveJournal-based site.

[commit: http://hg.dwscoalition.org/dw-free/rev/0f2d825b5ddc]

http://bugs.dwscoalition.org/show_bug.cgi?id=114

Add lots of logging to the lj_entries import mode. Also make it resilient
in the case of another importer running, or running multiple times in quick
succession.

Patch by [staff profile] mark.

Files modified:
  • cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm
--------------------------------------------------------------------------------
diff -r e5e27214cc19 -r 0f2d825b5ddc cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm
--- a/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm	Sun Mar 01 23:35:58 2009 +0000
+++ b/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm	Mon Mar 02 21:12:00 2009 +0000
@@ -20,6 +20,7 @@ use base 'DW::Worker::ContentImporter::L
 use base 'DW::Worker::ContentImporter::LiveJournal';
 
 use Carp qw/ croak confess /;
+use Time::HiRes qw/ tv_interval gettimeofday /;
 use DW::Worker::ContentImporter::Local::Entries;
 
 sub work {
@@ -30,12 +31,16 @@ sub work {
         warn "Failure running job: $@\n";
         return $class->temp_fail( $job, 'Failure running job: %s', $@ );
     }
+
+    # FIXME: temporary hack to reclaim memory when we have imported entries
+    exit 0;
 }
 
 sub try_work {
     my ( $class, $job ) = @_;
     my $opts = $job->arg;
     my $data = $class->import_data( $opts->{userid}, $opts->{import_data_id} );
+    my $begin_time = [ gettimeofday() ];
 
     # failure wrappers for convenience
     my $fail      = sub { return $class->fail( $data, 'lj_entries', $job, @_ ); };
@@ -43,16 +48,42 @@ sub try_work {
     my $temp_fail = sub { return $class->temp_fail( $data, 'lj_entries', $job, @_ ); };
     my $status    = sub { return $class->status( $data, 'lj_entries', { @_ } ); };
 
+    # logging sub
+    my ( $logfile, $last_log_time );
+    my $log = sub {
+        $last_log_time ||= [ gettimeofday() ];
+
+        unless ( $logfile ) {
+            mkdir "$LJ::HOME/logs/imports";
+            mkdir "$LJ::HOME/logs/imports/$opts->{userid}";
+            open $logfile, ">>$LJ::HOME/logs/imports/$opts->{userid}/$opts->{import_data_id}.lj_entries.$$"
+                or return $temp_fail->( 'Internal server error creating log.' );
+            print $logfile "[0.00s 0.00s] Log started at " . LJ::mysql_time(gmtime()) . ".\n";
+        }
+
+        my $fmt = "[%0.4fs %0.1fs] " . shift() . "\n";
+        my $msg = sprintf( $fmt, tv_interval( $last_log_time ), tv_interval( $begin_time), @_ );
+
+        print $logfile $msg;
+        $job->debug( $msg );
+
+        $last_log_time = [ gettimeofday() ];
+    };
+
     # setup
     my $u = LJ::load_userid( $data->{userid} )
         or return $fail->( 'Unable to load target with id %d.', $data->{userid} );
-    my $entry_map = DW::Worker::ContentImporter::Local::Entries->get_entry_map( $u );
+    $log->( 'Import begun for %s(%d).', $u->user, $u->userid );
+
+    # load entry map
+    my $entry_map = DW::Worker::ContentImporter::Local::Entries->get_entry_map( $u ) || {};
+    $log->( 'Loaded entry map with %d entries.', scalar( keys %$entry_map ) );
 
     # load the syncitems list; but never try to load the same lastsync time twice, just
     # in case 
     my ( $lastsync, %tried_syncs, %sync );
     while ( $tried_syncs{$lastsync} < 2 ) {
-        warn "[$$] Attempting lastsync = " . ( $lastsync || 'undef' ) . "\n";
+        $log->( 'Calling syncitems; lastsync = %s.', ( $lastsync || 'undef' ) );
         my $hash = $class->call_xmlrpc( $data, 'syncitems', { lastsync => $lastsync } );
         return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
             if ! $hash || $hash->{fault};
@@ -65,9 +96,10 @@ sub try_work {
             $tried_syncs{$lastsync}++;
         }
 
-        warn "     count $hash->{count} == total $hash->{total}\n";
+        $log->( '    retrieved %d items and %d left to sync', $hash->{count}, $hash->{total} );
         last if $hash->{count} == $hash->{total};
     }
+    $log->( 'Syncitems finished.' );
 
     my $realtime = sub {
         my $id = shift;
@@ -79,21 +111,45 @@ sub try_work {
         my $count = 0;
 
         # calculate what time to get entries for
-        my @keys = sort { $sync{$a}->[1] cmp $sync{$b}->[1] } keys %sync;
-        my $lastgrab = LJ::mysql_time( LJ::mysqldate_to_time( $sync{$keys[0]}->[1] ) - 1 );
+        my ( $tries, $lastgrab, $hash ) = ( 0, undef, undef );
+        while ( $tries++ <= 10 ) {
+            # calculate the oldest entry we haven't retrieved yet, and offset that time by
+            # $tries, so we can break the 'broken client' logic (note: we assert that we are
+            # not broken.)
+            my @keys = sort { $sync{$a}->[1] cmp $sync{$b}->[1] } keys %sync;
+            $lastgrab = LJ::mysql_time( LJ::mysqldate_to_time( $sync{$keys[0]}->[1] ) - $tries );
 
-        warn "[$$] Fetching from lastsync = $lastgrab forward\n";
-        my $hash = $class->call_xmlrpc( $data, 'getevents',
-            {
-                ver         => 1,
-                lastsync    => $lastgrab,
-                selecttype  => 'syncitems',
-                lineendings => 'unix',
+            $log->( 'Loading entries; lastsync = %s.', $lastgrab );
+            $hash = $class->call_xmlrpc( $data, 'getevents',
+                {
+                    ver         => 1,
+                    lastsync    => $lastgrab,
+                    selecttype  => 'syncitems',
+                    lineendings => 'unix',
+                }
+            );
+
+            # sometimes LJ doesn't like us on large imports, so try a few times to hush
+            # up the error.
+            if ( $hash && $hash->{fault} && $hash->{faultString} =~ /broken/ ) {
+                $log->( '    repeated requests error, retrying.' );
+                next;
             }
-        );
+
+            # bail if we get a different error
+            return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
+                if ! $hash || $hash->{fault};
+
+            # if we get here we're probably in good shape, bail out
+            last;
+        }
+
+        # there is a slight chance we will get here if we run out of 'broken' retries
+        # so check for that
         return $temp_fail->( 'XMLRPC failure: ' . $hash->{faultString} )
             if ! $hash || $hash->{fault};
 
+        # iterate over events and import them
         foreach my $evt ( @{$hash->{events} || []} ) {
             $count++;
 
@@ -101,7 +157,7 @@ sub try_work {
             $evt->{key} = $evt->{url};
 
             # skip this if we've already dealt with it before
-            warn "     [$evt->{itemid}] $evt->{url} // $evt->{realtime} // map=$entry_map->{$evt->{key}}\n";
+            $log->( '    %d %s %s; mapped = %d.', $evt->{itemid}, $evt->{url}, $evt->{realtime}, $entry_map->{$evt->{key}} );
             my $sync = delete $sync{$evt->{itemid}};
             next if $entry_map->{$evt->{key}} || !defined $sync;
 
@@ -153,7 +209,7 @@ sub try_work {
             ) if @item_errors;
         }
 
-        warn "     count = $count && lastgrab = $lastgrab\n";
+        $log->( '    counted %d entries, lastgrab is now %s.', $count, $lastgrab );
     }
 
     return $ok->();
--------------------------------------------------------------------------------