[dw-free] imports of journals with massive numbers of comments timeout, suck memory
[commit: http://hg.dwscoalition.org/dw-free/rev/ea5fc2c10527]
http://bugs.dwscoalition.org/show_bug.cgi?id=4112
This is basically a revamp of the comment import code so that it runs in
constant(ish) memory. The idea is to publish comments as they are body
fetched.
Since we still have to do the metadata import first (well, maybe not, but we
do it that way for now) I have also changed from using hashes (memory
intensive when there's millions) to using arrays.
I've also added preloading for users and props which saves a little bit of
time, too.
This still needs more testing, particularly with really large import
targets.
Patch by
mark.
Files modified:
http://bugs.dwscoalition.org/show_bug.cgi?id=4112
This is basically a revamp of the comment import code so that it runs in
constant(ish) memory. The idea is to publish comments as they are body
fetched.
Since we still have to do the metadata import first (well, maybe not, but we
do it that way for now) I have also changed from using hashes (memory
intensive when there's millions) to using arrays.
I've also added preloading for users and props which saves a little bit of
time, too.
This still needs more testing, particularly with really large import
targets.
Patch by
![[staff profile]](https://www.dreamwidth.org/img/silk/identity/user_staff.png)
Files modified:
- bin/worker/content-importer
- bin/worker/import-scheduler
- cgi-bin/DW/Worker/ContentImporter/LiveJournal/Comments.pm
- cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm
- cgi-bin/DW/Worker/ContentImporter/Local/Comments.pm
- cgi-bin/LJ/Entry.pm
- cgi-bin/LJ/Talk.pm
- cgi-bin/LJ/User.pm
- cgi-bin/LJ/Web.pm
-------------------------------------------------------------------------------- diff -r fb24395b0f38 -r ea5fc2c10527 bin/worker/content-importer --- a/bin/worker/content-importer Thu Dec 29 06:52:33 2011 +0000 +++ b/bin/worker/content-importer Thu Dec 29 07:00:44 2011 +0000 @@ -17,8 +17,9 @@ use strict; use lib "$ENV{LJHOME}/cgi-bin"; +require 'ljlib.pl'; -require 'ljlib.pl'; +$LJ::LOG_GTOP = 1; use LJ::Worker::TheSchwartz; use DW::Worker::ContentImporter::LiveJournal::Bio; diff -r fb24395b0f38 -r ea5fc2c10527 bin/worker/import-scheduler --- a/bin/worker/import-scheduler Thu Dec 29 06:52:33 2011 +0000 +++ b/bin/worker/import-scheduler Thu Dec 29 07:00:44 2011 +0000 @@ -20,6 +20,7 @@ require 'ljlib.pl'; use Time::HiRes qw/ gettimeofday tv_interval /; +use Getopt::Long; $| = 1; # Line buffered. my $DEBUG = 0; @@ -153,13 +154,16 @@ } } +# run the job in a loop my $begin_time = [ gettimeofday() ]; -# run the job in a loop while ( 1 ) { - $DEBUG = 1 if $ARGV[0] =~ /^--?v(?:erbose)?$/; + my $once = 0; + GetOptions( 'verbose' => \$DEBUG, 'once' => \$once ); + _log( 'Main loop beginning...' ); worker_helper(); + last if $once; # now we sleep to the next one minute boundary, and if we're taking more # than one minute to run, we fire off an alert diff -r fb24395b0f38 -r ea5fc2c10527 cgi-bin/DW/Worker/ContentImporter/LiveJournal/Comments.pm --- a/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Comments.pm Thu Dec 29 06:52:33 2011 +0000 +++ b/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Comments.pm Thu Dec 29 07:00:44 2011 +0000 @@ -8,7 +8,7 @@ # Andrea Nall <anall@andreanall.com> # Mark Smith <mark@dreamwidth.org> # -# Copyright (c) 2009 by Dreamwidth Studios, LLC. +# Copyright (c) 2009-2011 by Dreamwidth Studios, LLC. # # This program is free software; you may redistribute it and/or modify it under # the same terms as Perl itself. For a copy of the license, please reference @@ -24,14 +24,34 @@ use Time::HiRes qw/ tv_interval gettimeofday /; use DW::Worker::ContentImporter::Local::Comments; +# to save memory, we use arrays instead of hashes. +use constant C_id => 0; +use constant C_remote_posterid => 1; +use constant C_state => 2; +use constant C_remote_parentid => 3; +use constant C_remote_jitemid => 4; +use constant C_body => 5; +use constant C_subject => 6; +use constant C_date => 7; +use constant C_props => 8; +use constant C_source => 9; +use constant C_entry_source => 10; +use constant C_orig_id => 11; +use constant C_done => 12; +use constant C_body_fixed => 13; +use constant C_local_parentid => 14; +use constant C_local_jitemid => 15; +use constant C_local_posterid => 16; + # these come from LJ our $COMMENTS_FETCH_META = 10000; our $COMMENTS_FETCH_BODY = 500; sub work { - # VITALLY IMPORTANT THAT THIS IS CLEARED BETWEEN JOBS %DW::Worker::ContentImporter::LiveJournal::MAPS = (); + DW::Worker::ContentImporter::Local::Comments->clear_caches(); + LJ::start_request(); my ( $class, $job ) = @_; my $opts = $job->arg; @@ -46,6 +66,31 @@ } } +sub new_comment { + my ( $id, $posterid, $state ) = @_; + return [ undef, $posterid+0, $state, undef, undef, undef, undef, undef, {}, + undef, undef, $id+0, undef, 0, undef, undef, undef ]; +} + +sub hashify { + return { + id => $_[0]->[C_id], + posterid => $_[0]->[C_local_posterid], + state => $_[0]->[C_state], + parentid => $_[0]->[C_local_parentid], + jitemid => $_[0]->[C_local_jitemid], + body => $_[0]->[C_body], + subject => $_[0]->[C_subject], + date => $_[0]->[C_date], + props => $_[0]->[C_props], + source => $_[0]->[C_source], + entry_source => $_[0]->[C_entry_source], + orig_id => $_[0]->[C_orig_id], + done => $_[0]->[C_done], + body_fixed => $_[0]->[C_body_fixed], + }; +} + sub try_work { my ( $class, $job, $opts, $data ) = @_; my $begin_time = [ gettimeofday() ]; @@ -87,6 +132,7 @@ my $u = LJ::load_userid( $data->{userid} ) or return $fail->( 'Unable to load target with id %d.', $data->{userid} ); $log->( 'Import begun for %s(%d).', $u->user, $u->userid ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # title munging my $title = sub { @@ -100,14 +146,15 @@ # this will take a entry_map (old URL -> new jitemid) and convert it into a jitemid map (old jitemid -> new jitemid) my $entry_map = DW::Worker::ContentImporter::Local::Entries->get_entry_map( $u ) || {}; $log->( 'Loaded entry map with %d entries.', scalar( keys %$entry_map ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # and xpost map my $xpost_map = $class->get_xpost_map( $u, $data ) || {}; $log->( 'Loaded xpost map with %d entries.', scalar( keys %$xpost_map ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # now backfill into jitemid_map - my $entry_source = {}; - my $jitemid_map = {}; + my ( %entry_source, %jitemid_map ); $log->( 'Filtering parameters: hostname=[%s], username=[%s].', $data->{hostname}, $data->{username} ); foreach my $url ( keys %$entry_map ) { # this works, see the Entries importer for more information @@ -120,20 +167,22 @@ if ( $url =~ m!/(\d+)\.html$! ) { my $jitemid = $1 >> 8; - $jitemid_map->{$jitemid} = $entry_map->{$url}; - $entry_source->{$jitemid_map->{$jitemid}} = $url; + $jitemid_map{$jitemid} = $entry_map->{$url}; + $entry_source{$jitemid_map{$jitemid}} = $url; } } $log->( 'Entry map has %d entries post-prune.', scalar( keys %$entry_map ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); foreach my $jitemid ( keys %$xpost_map ) { - $jitemid_map->{$jitemid} = $xpost_map->{$jitemid}; - $entry_source->{$jitemid_map->{$jitemid}} = "CROSSPOSTER " . $data->{hostname} . " " . $data->{username} . " $jitemid " + $jitemid_map{$jitemid} = $xpost_map->{$jitemid}; + $entry_source{$jitemid_map{$jitemid}} = "CROSSPOSTER " . $data->{hostname} . " " . $data->{username} . " $jitemid " } # this will take a talk_map (old URL -> new jtalkid) and convert it to a jtalkid map (old jtalkid -> new jtalkid) my $talk_map = DW::Worker::ContentImporter::Local::Comments->get_comment_map( $u ) || {}; $log->( 'Loaded comment map with %d entries.', scalar( keys %$talk_map ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # now reverse it as above my $jtalkid_map = {}; @@ -152,8 +201,15 @@ } } + # for large imports, the two maps are big (contains URLs), so let's drop it + # since we're never going to use it again. PS I don't actually know if this + # frees the memory, but I'm hoping it does. + undef $talk_map; + undef $entry_map; + undef $xpost_map; + # parameters for below - my ( %meta, @userids, $identity_map, $was_external_user ); + my ( %meta, %identity_map, %was_external_user ); my ( $maxid, $server_max_id, $server_next_id, $lasttag ) = ( 0, 0, 1, '' ); # setup our parsing function @@ -166,18 +222,12 @@ # if we were last getting a comment, start storing the info if ( $lasttag eq 'comment' ) { # get some data on a comment - $meta{$temp{id}} = { - id => $temp{id}, - posterid => $temp{posterid}+0, - state => $temp{state} || 'A', - }; + $meta{$temp{id}} = new_comment( $temp{id}, $temp{posterid}+0, $temp{state} || 'A' ); - } elsif ( $lasttag eq 'usermap' && ! exists $identity_map->{$temp{id}} ) { - push @userids, $temp{id}; - + } elsif ( $lasttag eq 'usermap' && ! exists $identity_map{$temp{id}} ) { my ( $local_oid, $local_fid ) = $class->get_remapped_userids( $data, $temp{user} ); - $identity_map->{$temp{id}} = $local_oid; - $was_external_user->{$temp{id}} = 1 + $identity_map{$temp{id}} = $local_oid; + $was_external_user{$temp{id}} = 1 if $temp{user} =~ m/^ext_/; # If the remote username starts with ext_ flag it as external $log->( 'Mapped remote %s(%d) to local userid %d.', $temp{user}, $temp{id}, $local_oid ); @@ -191,7 +241,7 @@ # if we're in a maxid tag, we want to save that value so we know how much further # we have to go in downloading meta info return undef - unless $lasttag eq 'maxid' || + unless $lasttag eq 'maxid' || $lasttag eq 'nextid'; # save these values for later @@ -229,6 +279,119 @@ $server_max_id > $LJ::COMMENT_IMPORT_MAX; } $log->( 'Finished fetching metadata.' ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); + + # this method is called when we have some comments to post. this will do a best effort + # attempt to post all comments that are filled in. + my $post_comments = sub { + # now iterate over each comment and build the nearly final structure + foreach my $id ( sort keys %meta ) { + my $comment = $meta{$id}; + next unless defined $comment->[C_done]; # must be defined + next if $comment->[C_done] || $comment->[C_body_fixed]; + + # where this comment comes from + $comment->[C_source] = $data->{hostname} + if $was_external_user{$comment->[C_remote_posterid]}; + + # basic mappings + $comment->[C_local_posterid] = $identity_map{$comment->[C_remote_posterid]}+0; + $comment->[C_local_jitemid] = $jitemid_map{$comment->[C_remote_jitemid]}+0; + $comment->[C_entry_source] = $entry_source{$comment->[C_local_jitemid]}; + + # remap content (user links) then remove embeds/templates + my $body = $class->remap_lj_user( $data, $comment->[C_body] ); + $body =~ s/<.+?-embed-.+?>/[Embedded content removed during import.]/g; + $body =~ s/<.+?-template-.+?>/[Templated content removed during import.]/g; + $comment->[C_body] = $body; + + # now let's do some encoding, just in case the input we get is in some other + # character encoding + $comment->[C_body] = encode_utf8( $comment->[C_body] || '' ); + $comment->[C_subject] = encode_utf8( $comment->[C_subject] || '' ); + + # this body is done + $comment->[C_body_fixed] = 1; + } + + # variable setup for the database work + my @to_import = sort { ( $a->[C_orig_id]+0 ) <=> ( $b->[C_orig_id]+0 ) } + grep { defined $_->[C_done] && $_->[C_done] == 0 && $_->[C_body_fixed] == 1 } + values %meta; + + # This loop should never need to run through more than once + # but, it will *if* for some reason a comment comes before its parent + # which *should* never happen, but I'm handling it anyway, just in case. + $title->( 'posting %d comments', scalar( @to_import ) ); + + # let's do some batch loads of the users and entries we're going to need + my ( %jitemids, %userids ); + foreach my $comment ( @to_import ) { + $jitemids{$comment->[C_local_jitemid]} = 1; + $userids{$comment->[C_local_posterid]} = 1 + if defined $comment->[C_local_posterid]; + } + DW::Worker::ContentImporter::Local::Comments->precache( $u, [ keys %jitemids ], [ keys %userids ] ); + + # now doing imports! + foreach my $comment ( @to_import ) { + next if $comment->[C_done]; + + # status output update + $title->( 'posting %d/%d comments [%d]', $comment->[C_orig_id], $server_max_id, scalar( @to_import ) ); + $log->( "Attempting to import remote id %d, parentid %d, state %s.", + $comment->[C_orig_id], $comment->[C_remote_parentid], $comment->[C_state] ); + + # if this comment already exists, we might need to update it, however + my $err = ""; + if ( my $jtalkid = $jtalkid_map->{$comment->[C_orig_id]} ) { + $log->( 'Comment already exists, passing to updater.' ); + + $comment->[C_local_parentid] = $jtalkid_map->{$comment->[C_remote_parentid]}+0; + $comment->[C_id] = $jtalkid; + + DW::Worker::ContentImporter::Local::Comments->update_comment( $u, hashify( $comment ), \$err ); + $log->( 'ERROR: %s', $err ) if $err; + + $comment->[C_done] = 1; + next; + } + + # due to the ordering, by the time we're here we should be guaranteed to have + # our parent comment. if we don't, bail out on this comment and mark it as done. + if ( $comment->[C_remote_parentid] && !defined $comment->[C_local_parentid] ) { + my $lpid = $jtalkid_map->{$comment->[C_remote_parentid]}; + unless ( defined $lpid ) { + $log->( 'ERROR: Failed to map remote parent %d.', $comment->[C_remote_parentid] ); + next; + } + $comment->[C_local_parentid] = $lpid+0; + } else { + $comment->[C_local_parentid] = 0; # top level + } + $log->( 'Remote parent %d is local parent %d for orig_id=%d.', + $comment->[C_remote_parentid], $comment->[C_local_parentid], $comment->[C_orig_id] ) + if $comment->[C_remote_parentid]; + + # if we get here we're good to insert into the database + my $talkid = DW::Worker::ContentImporter::Local::Comments->insert_comment( $u, hashify( $comment ), \$err ); + if ( $talkid ) { + $log->( 'Successfully imported remote id %d to new jtalkid %d.', $comment->[C_orig_id], $talkid ); + } else { + $log->( 'Failed to import comment %d: %s.', $comment->[C_orig_id], $err ); + return $temp_fail->( 'Failure importing comment: %s.', $err ); + } + + # store this information + $jtalkid_map->{$comment->[C_orig_id]} = $talkid; + $comment->[C_id] = $talkid; + $comment->[$_] = undef # free up some memory + foreach ( C_props, C_body, C_subject ); + $comment->[C_done] = 1; + } + + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); + }; # body handling section now my ( $lastid, $curid, $lastprop, @tags ) = ( 0, 0, undef ); @@ -242,9 +405,9 @@ my %temp = ( @_ ); # take the rest into our humble hash if ( $lasttag eq 'comment' ) { # get some data on a comment - $curid = $temp{id}; - $meta{$curid}{parentid} = $temp{parentid}+0; - $meta{$curid}{jitemid} = $temp{jitemid}+0; + $curid = $temp{id}+0; + $meta{$curid}->[C_remote_parentid] = $temp{parentid}+0; + $meta{$curid}->[C_remote_jitemid] = $temp{jitemid}+0; } elsif ( $lasttag eq 'property' ) { $lastprop = $temp{name}; } @@ -254,6 +417,10 @@ my $tag = pop @tags; $lasttag = $tags[0]; $lastprop = undef; + if ( $curid ) { + $meta{$curid}->[C_done] = 0 + unless defined $meta{$curid}->[C_done]; + } }; my $body_content = sub { # this grabs data inside of comments: body, subject, date, properties @@ -263,9 +430,10 @@ # that may or may not be in the data stream, and we won't know until we've already gotten # some data if ( $lasttag =~ /(?:body|subject|date)/ ) { - $meta{$curid}{$lasttag} .= $_[1]; + my $arrid = { body => 5, subject => 6, date => 7 }->{$lasttag}; + $meta{$curid}->[$arrid] .= $_[1]; } elsif ( $lastprop && $lasttag eq 'property' ) { - $meta{$curid}{props}{$lastprop} .= $_[1]; + $meta{$curid}->[C_props]->{$lastprop} .= $_[1]; } }; @@ -306,8 +474,8 @@ # reset all text so we don't get it double posted foreach my $cmt ( values %meta ) { - delete $cmt->{$_} - foreach qw/ subject body date props /; + $cmt->[$_] = undef + foreach ( C_subject, C_body, C_date, C_props ); } # and now filter. note that we're assuming this is ISO-8859-1, as that's a @@ -325,142 +493,15 @@ # this will fail nicely as soon as some site we're importing from reduces the max items # they return due to load. http://community.livejournal.com/changelog/5907095.html $lastid += $COMMENTS_FETCH_BODY; + + # now we've got some body text, try to post these comments. if we can do that, we can clear + # them from memory to reduce how much we're storing. + $post_comments->(); } - # now iterate over each comment and build the nearly final structure - foreach my $comment ( values %meta ) { - - # if we weren't able to map to a jitemid (last entry import a while ago?) - # or some other problem, log it and bail - unless ( $jitemid_map->{$comment->{jitemid}} ) { - $comment->{skip} = 1; - $log->( 'NO MAPPED ENTRY: remote values: jitemid %d, posterid %d, jtalkid %d.', - $comment->{jitemid}, $comment->{posterid}, $comment->{id} ); - next; - } - - $comment->{source} = $data->{hostname} - if $was_external_user->{$comment->{posterid}}; - - # basic mappings - $comment->{posterid} = $identity_map->{$comment->{posterid}}; - $comment->{jitemid} = $jitemid_map->{$comment->{jitemid}}; - $comment->{orig_id} = $comment->{id}; - - $comment->{entry_source} = $entry_source->{$comment->{jitemid}}; - - # unresolved comments means we haven't got the parent in the database - # yet so we can't post this one - $comment->{unresolved} = 1 - if $comment->{parentid}; - - # the reverse of unresolved, tell the parent it has visible children - $meta{$comment->{parentid}}->{has_children} = 1 - if exists $meta{$comment->{parentid}} && - $comment->{parentid} && $comment->{state} ne 'D'; - - # remap content (user links) then remove embeds/templates - my $body = $class->remap_lj_user( $data, $comment->{body} ); - $body =~ s/<.+?-embed-.+?>/[Embedded content removed during import.]/g; - $body =~ s/<.+?-template-.+?>/[Templated content removed during import.]/g; - $comment->{body} = $body; - - # now let's do some encoding, just in case the input we get is in some other - # character encoding - $comment->{body} = encode_utf8( $comment->{body} || '' ); - $comment->{subject} = encode_utf8( $comment->{subject} || '' ); - } - - # variable setup for the database work - my @to_import = sort { ( $a->{id}+0 ) <=> ( $b->{id}+0 ) } values %meta; - my $had_unresolved = 1; - - # This loop should never need to run through more than once - # but, it will *if* for some reason a comment comes before its parent - # which *should* never happen, but I'm handling it anyway, just in case. - $title->( 'posting %d comments', scalar( @to_import ) ); - while ( $had_unresolved ) { - - # variables, and reset - my ( $ct, $ct_unresolved ) = ( 0, 0 ); - $had_unresolved = 0; - - # now doing imports! - foreach my $comment ( @to_import ) { - next if $comment->{skip}; - - $title->( 'posting %d/%d comments', $comment->{orig_id}, scalar( @to_import ) ); - $log->( "Attempting to import remote id %d, parentid %d, state %s.", - $comment->{orig_id}, $comment->{parentid}, $comment->{state} ); - - # rules we might skip a content with - next if $comment->{done}; # Skip this comment if it was already imported this round - - # if this comment already exists, we might need to update it, however - my $err = ""; - if ( my $jtalkid = $jtalkid_map->{$comment->{orig_id}} ) { - $log->( 'Comment already exists, passing to updater.' ); - - $comment->{id} = $jtalkid; - DW::Worker::ContentImporter::Local::Comments->update_comment( $u, $comment, \$err ); - $log->( 'ERROR: %s', $err ) if $err; - - $comment->{done} = 1; - next; - } - - # now we know this one is going in the database - $ct++; - - # try to resolve - if ( $comment->{unresolved} ) { - # lets see if this is resolvable at the moment - # A resolvable comment is a comment that's parent is already in the DW database - # and an unresolved comment is a comment that has a parent that is currently not in the database. - if ( $jtalkid_map->{$comment->{parentid}} ) { - $comment->{parentid} = $jtalkid_map->{$comment->{parentid}}; - $comment->{unresolved} = 0; - - $log->( 'Resolved unresolved comment to local parentid %d.', - $comment->{parentid} ); - - } else { - # guess we couldn't resolve it :( next pass! - $ct_unresolved++; - $had_unresolved = 1; - - $log->( 'Failed to resolve comment.' ); - - next; - } - } - - # if we get here we're good to insert into the database - my $talkid = DW::Worker::ContentImporter::Local::Comments->insert_comment( $u, $comment, \$err ); - if ( $talkid ) { - $log->( 'Successfully imported source %d to new jtalkid %d.', $comment->{id}, $talkid ); - } else { - $log->( 'Failed to import comment %d: %s.', $comment->{id}, $err ); - return $temp_fail->( 'Failure importing comment: %s.', $err ); - } - - # store this information - $jtalkid_map->{$comment->{orig_id}} = $talkid; - $comment->{id} = $talkid; - $comment->{done} = 1; - } - - # sanity check. this happens from time to time when, for example, a comment - # is deleted but the chain of comments underneath it is never actually removed. - # given that the codebase doesn't use foreign keys and transactions, this can - # happen and we have to deal with it gracefully. log it. - if ( $ct == $ct_unresolved && $had_unresolved ) { - $log->( 'WARNING: User had %d unresolvable comments.', $ct_unresolved ); - - # set this to false so that we fall out of the main loop. - $had_unresolved = 0; - } - } + # now we have the final post loop... + $post_comments->(); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); return $ok->(); } diff -r fb24395b0f38 -r ea5fc2c10527 cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm --- a/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm Thu Dec 29 06:52:33 2011 +0000 +++ b/cgi-bin/DW/Worker/ContentImporter/LiveJournal/Entries.pm Thu Dec 29 07:00:44 2011 +0000 @@ -8,7 +8,7 @@ # Andrea Nall <anall@andreanall.com> # Mark Smith <mark@dreamwidth.org> # -# Copyright (c) 2009 by Dreamwidth Studios, LLC. +# Copyright (c) 2009-2011 by Dreamwidth Studios, LLC. # # This program is free software; you may redistribute it and/or modify it under # the same terms as Perl itself. For a copy of the license, please reference @@ -27,6 +27,7 @@ # VITALLY IMPORTANT THAT THIS IS CLEARED BETWEEN JOBS %DW::Worker::ContentImporter::LiveJournal::MAPS = (); + LJ::start_request(); my ( $class, $job ) = @_; my $opts = $job->arg; @@ -48,9 +49,9 @@ my ( $class, $job, $opts, $data ) = @_; my $begin_time = [ gettimeofday() ]; - # we know that we can potentially take a while, so budget an hour for + # we know that we can potentially take a while, so budget some hours for # the import job before someone else comes in to snag it - $job->grabbed_until( time() + 3600 ); + $job->grabbed_until( time() + 3600*12 ); $job->save; # failure wrappers for convenience @@ -85,6 +86,7 @@ my $u = LJ::load_userid( $data->{userid} ) or return $fail->( 'Unable to load target with id %d.', $data->{userid} ); $log->( 'Import begun for %s(%d).', $u->user, $u->userid ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # title munging my $title = sub { @@ -98,10 +100,12 @@ # load entry map my $entry_map = DW::Worker::ContentImporter::Local::Entries->get_entry_map( $u ) || {}; $log->( 'Loaded entry map with %d entries.', scalar( keys %$entry_map ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # and xpost map my $xpost_map = $class->get_xpost_map( $u, $data ) || {}; $log->( 'Loaded xpost map with %d entries.', scalar( keys %$xpost_map ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # this is a helper sub that steps a MySQL formatted time by some offset # arguments: '2008-01-01 12:03:53', -1 ... returns '2008-01-01 12:03:52' @@ -135,11 +139,12 @@ # now we can mark this, as we have officially syncd this time $tried_syncs{$lastsync}++; - $title->( 'syncitems - %d left', $hash->{title} ); + $title->( 'syncitems - %d left', $hash->{total} ); $log->( ' retrieved %d items and %d left to sync', $hash->{count}, $hash->{total} ); last if $hash->{count} == $hash->{total}; } $log->( 'Syncitems finished with %d items pre-prune.', scalar( keys %sync ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # this is an optimization. since we never do an edit event (only post!) we will # never get changes anyway. so let's remove from the list of things to sync any @@ -167,6 +172,7 @@ delete $sync{$1 >> 8}; } $log->( 'Syncitems now has %d items post-prune (first pass).', scalar( keys %sync ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); # this is another optimization. we know crossposted entries can be removed from # the list of things we will import, as we generated them to begin with. @@ -174,6 +180,7 @@ delete $sync{$itemid}; } $log->( 'Syncitems now has %d items post-prune (second pass).', scalar( keys %sync ) ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); $title->( 'post-prune' ); @@ -283,6 +290,10 @@ my ( $ok, $res ) = DW::Worker::ContentImporter::Local::Entries->post_event( $data, $entry_map, $u, $posteru, $evt, \@item_errors ); + # we don't need this text anymore, so nuke it to try to save memory + delete $evt->{event}; + delete $evt->{subject}; + # now record any errors that happened $status->( remote_url => $evt->{url}, @@ -409,6 +420,7 @@ # log some status for later $log->( ' counted %d entries, lastgrab is now %s.', $count, $lastgrab ); + $log->( 'memory usage is now %dMB', LJ::gtop()->proc_mem($$)->resident/1024/1024 ); } # mark the comments mode as ready to schedule diff -r fb24395b0f38 -r ea5fc2c10527 cgi-bin/DW/Worker/ContentImporter/Local/Comments.pm --- a/cgi-bin/DW/Worker/ContentImporter/Local/Comments.pm Thu Dec 29 06:52:33 2011 +0000 +++ b/cgi-bin/DW/Worker/ContentImporter/Local/Comments.pm Thu Dec 29 07:00:44 2011 +0000 @@ -8,7 +8,7 @@ # Andrea Nall <anall@andreanall.com> # Mark Smith <mark@dreamwidth.org> # -# Copyright (c) 2009 by Dreamwidth Studios, LLC. +# Copyright (c) 2009-2011 by Dreamwidth Studios, LLC. # # This program is free software; you may redistribute it and/or modify it under # the same terms as Perl itself. For a copy of the license, please reference @@ -18,6 +18,8 @@ package DW::Worker::ContentImporter::Local::Comments; use strict; +our ( $EntryCache, $UserCache ); + =head1 NAME DW::Worker::ContentImporter::Local::Comments - Local data utilities for comments @@ -26,28 +28,61 @@ These functions are part of the Saving API for comments. +=head2 C<< $class->clear_caches() >> + +This needs to be called between each import. This ensures we clear out the local caches +so we don't bleed data from one import to the next. + +=cut + +sub clear_caches { + $EntryCache = undef; + $UserCache = undef; +} + +=head2 C<< $class->precache( $u, $jitemid_hash, $userid_hash ) >> + +Given a user and two hashrefs (keys are jitemids and then userids respectively), this +will do a bulk load of those items and precache them. This is designed to be used +right before we import a bunch of comments to give us some performance and save all +the roundtrips. + +=cut + +sub precache { + my ( $class, $u, $jitemids, $userids ) = @_; + + $UserCache = LJ::load_userids( @$userids ); + + foreach my $jitemid ( @$jitemids ) { + $EntryCache->{$jitemid} = LJ::Entry->new( $u, jitemid => $jitemid ); + } + LJ::Entry->preload_props_all(); +} + =head2 C<< $class->get_comment_map( $user, $hashref ) >> -Returns a hashref mapping import_source keys to jtalkids +Returns a hashref mapping import_source keys to jtalkids. This really shouldn't +fail or we get into awkward duplication states. =cut sub get_comment_map { my ( $class, $u ) = @_; - my $p = LJ::get_prop( "talk", "import_source" ); - return {} unless $p; + my $p = LJ::get_prop( "talk", "import_source" ) + or die "Failed to load import_source property."; + my $dbr = LJ::get_cluster_reader( $u ) + or die "Failed to get database reader for user."; + my $sth = $dbr->prepare( "SELECT jtalkid, value FROM talkprop2 WHERE journalid = ? AND tpropid = ?" ) + or die "Failed to allocate statement handle."; + $sth->execute( $u->id, $p->{id} ) + or die "Failed to execute query."; - my $dbr = LJ::get_cluster_reader( $u ); my %map; - my $sth = $dbr->prepare( "SELECT jtalkid, value FROM talkprop2 WHERE journalid = ? AND tpropid = ?" ); - - $sth->execute( $u->id, $p->{id} ); - while ( my ( $jitemid, $value ) = $sth->fetchrow_array ) { $map{$value} = $jitemid; } - return \%map; } @@ -115,9 +150,10 @@ $errref ||= ''; # load the data we need to make this comment - my $jitem = LJ::Entry->new( $u, jitemid => $cmt->{jitemid} ); - my $source = ( $cmt->{entry_source} || $jitem->prop( "import_source" ) ) . "?thread=" . ( $cmt->{id} << 8 ); - my $user = $cmt->{posterid} ? LJ::load_userid( $cmt->{posterid} ) : undef; + my $jitem = $EntryCache->{$cmt->{jitemid}} || + LJ::Entry->new( $u, jitemid => $cmt->{jitemid} ); + my $source = ( $cmt->{entry_source} || $jitem->prop( "import_source" ) ) . "?thread=" . ( $cmt->{orig_id} << 8 ); + my $user = $cmt->{posterid} ? ( $UserCache->{$cmt->{posterid}} || LJ::load_userid( $cmt->{posterid} ) ) : undef; # fix the XML timestamp to a useful timestamp my $date = $cmt->{date}; @@ -183,7 +219,7 @@ =head1 COPYRIGHT AND LICENSE -Copyright (c) 2009 by Dreamwidth Studios, LLC. +Copyright (c) 2009-2011 by Dreamwidth Studios, LLC. This program is free software; you may redistribute it and/or modify it under the same terms as Perl itself. For a copy of the license, please reference diff -r fb24395b0f38 -r ea5fc2c10527 cgi-bin/LJ/Entry.pm --- a/cgi-bin/LJ/Entry.pm Thu Dec 29 06:52:33 2011 +0000 +++ b/cgi-bin/LJ/Entry.pm Thu Dec 29 07:00:44 2011 +0000 @@ -383,6 +383,27 @@ } } +# method for preloading props into all outstanding singletons that haven't already +# loaded properties. +sub preload_props_all { + foreach my $uid ( keys %singletons ) { + my $hr = $singletons{$uid}; + + my @load; + foreach my $jid ( keys %$hr ) { + next if $hr->{$jid}->{_loaded_props}; + push @load, $jid; + } + + my $props = {}; + LJ::load_log_props2( $uid, \@load, $props ); + foreach my $jid ( keys %$props ) { + $hr->{$jid}->{props} = $props->{$jid}; + $hr->{$jid}->{_loaded_props} = 1; + } + } +} + # returns array of tags for this post sub tags { my $self = $_[0]; diff -r fb24395b0f38 -r ea5fc2c10527 cgi-bin/LJ/Talk.pm --- a/cgi-bin/LJ/Talk.pm Thu Dec 29 06:52:33 2011 +0000 +++ b/cgi-bin/LJ/Talk.pm Thu Dec 29 07:00:44 2011 +0000 @@ -88,6 +88,7 @@ { my ( $pics, $id, $extra ) = @_; return unless defined $id && defined $pics->{pic}->{$id}; + $extra = '' unless defined $extra; my $p = $pics->{pic}->{$id}; return "<img src='$LJ::IMGPREFIX/talk/$p->{img}' border='0' ". diff -r fb24395b0f38 -r ea5fc2c10527 cgi-bin/LJ/User.pm --- a/cgi-bin/LJ/User.pm Thu Dec 29 06:52:33 2011 +0000 +++ b/cgi-bin/LJ/User.pm Thu Dec 29 07:00:44 2011 +0000 @@ -208,13 +208,18 @@ $u->set_prop("adult_content", $opts{journal_adult_settings}) if LJ::is_enabled( 'adult_content' ); $u->set_default_style; - if ( my $remote = LJ::get_remote() ) { - LJ::set_rel($u, $remote, "A"); # maintainer - LJ::set_rel($u, $remote, "M") if $opts{moderated}; # moderator if moderated - $remote->join_community( $u, 1, 1 ); # member - - $u->set_comm_settings( $remote, { membership => $opts{membership}, - postlevel => $opts{postlevel} } ); + my $admin = LJ::load_userid( $opts{admin_userid} ) + if $opts{admin_userid}; + $admin ||= LJ::get_remote(); + + if ( $admin ) { + LJ::set_rel($u, $admin, "A"); # maintainer + LJ::set_rel($u, $admin, "M") if $opts{moderated}; # moderator if moderated + $admin->join_community( $u, 1, 1 ); # member + + $u->set_comm_settings( $admin, { membership => $opts{membership}, + postlevel => $opts{postlevel} } ) + if exists $opts{membership} && exists $opts{postlevel}; } return $u; } diff -r fb24395b0f38 -r ea5fc2c10527 cgi-bin/LJ/Web.pm --- a/cgi-bin/LJ/Web.pm Thu Dec 29 06:52:33 2011 +0000 +++ b/cgi-bin/LJ/Web.pm Thu Dec 29 07:00:44 2011 +0000 @@ -54,7 +54,7 @@ my $type = shift; # either "" or "input" my $attr = shift; - my ( $attrs, $alt, $ssl ) = ( '', '' ); + my ( $attrs, $alt, $ssl ) = ( '', '', 0 ); if ( $attr ) { if ( ref $attr eq "HASH" ) { if ( exists $attr->{alt} ) { --------------------------------------------------------------------------------