[dw-free] Found the original checkin of SPUD from 2004 'wcmtools' CVS repository. Bring a copy of t
[commit: http://hg.dwscoalition.org/dw-free/rev/d62f415767a4]
Found the original checkin of SPUD from 2004 'wcmtools' CVS repository.
Bring a copy of this into our repository as it may be the most useful way of
monitoring statistics on mogile/perlbal/memcache/etc. We'll see.
Patch by
mark.
Files modified:
Found the original checkin of SPUD from 2004 'wcmtools' CVS repository.
Bring a copy of this into our repository as it may be the most useful way of
monitoring statistics on mogile/perlbal/memcache/etc. We'll see.
Patch by
![[staff profile]](https://www.dreamwidth.org/img/silk/identity/user_staff.png)
Files modified:
- src/spud/bin/cmdshell
- src/spud/bin/gatherer
- src/spud/bin/plugins/config_generator.pl
- src/spud/bin/plugins/memcached.pl
- src/spud/bin/plugins/mogilefsd.pl
- src/spud/bin/plugins/mogstored.pl
- src/spud/bin/plugins/perlbal.pl
- src/spud/bin/plugins/test.pl
- src/spud/bin/replicator
- src/spud/bin/rrd-storage
- src/spud/bin/server
- src/spud/bin/wrapper
- src/spud/conf/authorized_keys
- src/spud/conf/cmdshell.conf
- src/spud/conf/sshd_config
- src/spud/conf/stats-local.conf
- src/spud/conf/stats.conf
- src/spud/doc/README.txt
-------------------------------------------------------------------------------- diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/cmdshell --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/cmdshell Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,100 @@ +#!/usr/bin/perl +# +# Danga's Command Shell +# Glue that takes incoming connection on STDIN/STDOUT and forwards it to a +# local server running on a certain port. This is meant to be an account's +# shell in /etc/passwd. +# +# Configuration file format: +# +# <command> = <server> : <port> +# +# Caveats & Warnings: +# This program must be in a bin directory. E.g. /home/lj/bin/cmdshell. It +# will extrapolate the location of its config file as the portion +# before the bin/ plus conf/cmdshell.conf. So, for the example above, +# /home/lj/conf/cmdshell.conf is expected to exist. +# +# Copyright 2004, Danga Interactive +# +# Authors: +# Mark Smith <marksmith@danga.com> +# +# License: +# undecided. +# + +# uses +use strict; +use IO::Socket; + +# known commands +my %commands; + +# try to guess the config file location? +if ($0 =~ m!^(.+)/bin/cmdshell$!) { + my $conf = "$1/conf/cmdshell.conf"; + die "Config file $conf not found." unless -e $conf; + + # open and parse + open FILE, "<$conf" + or die "Unable to open config file $conf: $!\n"; + while (my $line = <FILE>) { + if ($line =~ /^\s*([^#].*)\s*=\s*(.*)\s*/) { + my ($l, $r) = (trim($1), trim($2)); + if ($r =~ /^(.+)\s*:\s*(\d+)/) { + my ($host, $port) = (trim($1), trim($2)+0); + die "Invalid port '$2' for command $l in '$line'.\n" unless $port; + $commands{$l} = [ $host, $port ]; + } else { + die "Invalid config file line: $line\n"; + } + } + } + close FILE; +} else { + die "Unable to guess config file based off of path: $0\n"; +} + +# we're running, verify we have info +my $user = $ENV{LJUSERNAME}; +my $cmd = lc($ENV{LJCOMMAND}); +$cmd = '' unless $cmd =~ /^golive|replicator$/; +die "No information for command '$cmd'\n" unless $user && $cmd; + +# now based on command, open connection to port +my $socket = IO::Socket::INET->new(Proto => 'tcp', + PeerAddr => $commands{$cmd}->[0], + PeerPort => $commands{$cmd}->[1],) + or die "Unable to connect to command server ($commands{$cmd}->[0]:$commands{$cmd}->[1])\n"; +$socket->autoflush(1); + +# parent pid +my $ppid = $$; + +# now fork +my $kidpid; +die "Unable to fork: $!\n" unless defined($kidpid = fork()); + +# basic parts that handle reading/writing from the two sides +if ($kidpid) { + # parent + while (defined (my $line = <$socket>)) { + print STDOUT $line; + } + kill("TERM" => $kidpid); # death to the children +} else { + # child + while (defined (my $line = <STDIN>)) { + print $socket $line; + } + kill("TERM" => $ppid); # kill parent since we died +} + +# little trimming sub +sub trim { + my $res = shift; + $res =~ s/^\s+//; + $res =~ s/\s+$//; + return $res; +} diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/gatherer --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/gatherer Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,440 @@ +#!/usr/bin/perl +# +# Danga's Statistics Gatherer +# Gathers statistics using plugins. +# +# Command line options: +# +# --conf STRING set what config file to use for options +# --server STRING set what server to point at +# --port INT the port of the server to use +# --foreground if present, tell the server to force to the foreground +# +# Configuration file format: +# +# server = STRING location of statistics server +# port = INT port number server listens on +# +# job: plugin(parameter, parameter) +# job2: plugin +# helperplugin(parameter, parameter) +# ... +# +# Copyright 2004, Danga Interactive +# +# Authors: +# Mark Smith <marksmith@danga.com> +# +# License: +# undecided. +# + +use strict; +use lib "$ENV{LJHOME}/cgi-bin"; +use Getopt::Long; +use POSIX qw(:sys_wait_h); +use Danga::Daemon; + +# config +my $path = "$ENV{LJHOME}/bin/spud"; +my $plugins_path = "$path/plugins"; + +# command line config +my %opts; +GetOptions \%opts, qw/ conf=s server=s port=i /; + +# reverse mapping; ( pid => job-name ) +my %pids; + +# mapping of what we expect to die soon +my %todie; + +# called if we die +sub fail { + foreach my $pid (keys %pids) { + kill 15, $pid; + } + $_[0] ||= 'no reason provided'; + die "shutdown detected: $_[0]\n"; +} + +# now daemonize +Danga::Daemon::daemonize( + \&worker, + { + interval => 1, + listenport => 13500, + chdir => $path, + shutdowncode => \&fail, + }, +); + +# ( plugin-name => +# { +# modtime => int, # last modified time of plugin file (for reload detection) +# children => [ pid, pid, pid, ... ], # pids of workers doing this plugin +# file => str, # filename +# reload => bool, # if on, reload_plugins() will reload this plugin +# registered => bool, # if on, this plugin has registered correctly and is good to go +# code => { +# register => sub { ... }, +# worker => sub { ... }, +# }, +# } +# ); +my %plugins; + +# ( job-name => +# { +# plugin => plugin-name, +# respawn_with => plugin-name, +# pid => pid, +# options => [ option, option, option, ... ], # list of options from config file +# reload => bool, # if on, reload_plugins() will reload this job +# active => bool, # if on, this is active per the config file +# } +# ); +my %jobs; + +# array of plugins that want to help us out; array of [ pluginname, [ opts ] ] +my @helpers; + +# cached socket to our stat server +my $sock; + +sub set { + # sends a statistic up to the stat server + $sock ||= IO::Socket::INET->new(PeerAddr => $opts{server}, + PeerPort => $opts{port},); + fail("can't create socket: $!") + unless $sock; + + # send this on up to the server + while (scalar(@_) >= 2) { + $sock->print("QSET $_[0] $_[1]\r\n"); + shift @_; shift @_; + } +} + +# keeps track of whether or not this is the first worker run +sub worker { + # step 1: check through plugins directory to see what plugins have changed (or are new) + check_plugin_modtimes(); + + # step 2: check for a changed config file and load/reload as necessary + check_config(); + + # step 3: now that we know exactly what's changed, call master reloader + reload(); + + # step 4: run through any defined helpers so they can, well, help + foreach my $helpref (@helpers) { + my ($plugin, $opts) = @$helpref; + next unless $plugins{$plugin} && $plugins{$plugin}->{registered} && + $plugins{$plugin}->{code}->{helper}; + eval { + $plugins{$plugin}->{code}->{helper}->($opts); + }; + debug($@) if $@; + } +} + +sub check_plugin_modtimes { + opendir DIR, $plugins_path + or fail("Unable to open plugins directory for reading"); + foreach my $file (readdir(DIR)) { + next if $file =~ /^\./; + next unless $file =~ /^(.+)\.pl$/; + my $plugin = $1; + + # create an empty record if this is the first time we've found this plugin + $plugins{$plugin} ||= { + modtime => 0, + reload => 1, + registered => 0, + children => [], + file => "$plugins_path/$file", + }; + + # compare modtime and mark for reload if necessary + my $mt = (stat($plugins{$plugin}->{file}))[9]; + $plugins{$plugin}->{reload} = 1 + if $mt > $plugins{$plugin}->{modtime}; + $plugins{$plugin}->{modtime} = $mt; + } + closedir DIR; +} + +sub check_config { + fail("Config file not found") + unless -e $opts{conf}; + my $mt = (stat($opts{conf}))[9]; + $opts{config_modtime} ||= 0; + reload_config() + if $mt > $opts{config_modtime}; + $opts{config_modtime} = $mt; +} + +sub reload_config { + debug(""); + debug("configuration file reloading"); + + # we mark all jobs as inactive, so they get marked as active below + foreach (keys %jobs) { + $jobs{$_}->{active} = 0 + unless $jobs{$_}->{respawn_with}; + } + + # clear out all helpers, as they should get readded. they aren't in + # separate threads so it doesn't matter if we readd them every time. + @helpers = (); + + open FILE, "<$opts{conf}" + or fail("Unable to open config file: $!"); + foreach my $line (<FILE>) { + # ignore comments and clean surrounding whitespace + next if $line =~ /^\s*#/; + $line =~ s/^\s+//; + $line =~ s/[\s\r\n]+$//; + next unless length $line; + + # shortcut; set some options + if ($line =~ /^(\w+)\s*=\s*(.+)$/) { + $opts{$1} = $2; + next; + } + + # extract any options contained in (...) and going to the end of the line + my $optionstr; + if ($line =~ s/(?:\((.*)\))$//) { + $optionstr = $1; + } + my @options = + map { (s/^\s+//, s/\s$//, 1) ? $_ : undef } # now trim whitespace front and back + split(/,/, $optionstr); # split option string on commas + + # now see if it has a job + plugin left, or just a plugin + if ($line =~ /^([-\w:.]+)\s*:\s*(\w+)$/) { + # this is a job definition + my ($job, $plugin) = ($1, $2); + fail("Error adding $job to job list") + unless add_job($job, $plugin, \@options); + + } elsif ($line =~ /^(\w+)$/) { + # this is just a helper plugin + fail("Plugin $1 not defined") + unless $plugins{$1}; + + # push name of plugin on helper list + debug("helper from plugin $1 added"); + push @helpers, [ $1, \@options ]; + + } else { + fail("Unknown format: $line($optionstr)"); + } + } + close FILE; + debug("configuration file reloaded"); +} + +# main processor that goes through everything we have and reloads as necessary. this +# also handles reaping our children. +sub reload { + # iterate over any dead children we might have picked up + while ((my $pid = waitpid(-1, WNOHANG)) > 0) { + if (my $job = delete($pids{$pid})) { + if ($jobs{$job}->{active}) { + debug("[$job] dead; pid = $pid; marking to reload; unexpected!"); + $jobs{$job}->{reload} = 1; + $jobs{$job}->{pid} = 0; + } else { + debug("[$job] dead; pid = $pid; inactive job, NOT reloading"); + delete $jobs{$job}; + } + } else { + if (delete $todie{$pid}) { + debug("child death; pid = $pid; expected death, already reloaded"); + } else { + debug("ERROR: $pid died but we have no record of it"); + } + } + } + + # iterate over plugins and reload as necessary + foreach my $plugin (sort keys %plugins) { + next unless $plugins{$plugin}->{reload}; + debug("reloading plugin: $plugin"); + + # now require the file + my $file = $plugins{$plugin}->{file}; + unless (my $ret = do $file) { + if ($@) { + warn "couldn't parse $file: $@\n"; + } elsif (! defined $ret) { + warn "couldn't do $file: $!\n"; + } else { + warn "couldn't run $file\n"; + } + next; + } + + # now mark any jobs with this plugin to reload + foreach my $job (keys %jobs) { + $jobs{$job}->{reload} = 1 + if $jobs{$job}->{plugin} eq $plugin || + $jobs{$job}->{respawn_with} eq $plugin; + } + } + + # now that we know all the plugins are loaded, iterate over jobs so we can get + # the plugins spawned and doing something + foreach my $job (sort keys %jobs) { + my $plugin = $plugins{$jobs{$job}->{plugin}}; + fail("can't add job for plugin with no worker code: job = $job; plugin = $jobs{$job}->{plugin}") + unless ref $plugin->{code}->{worker}; + + # see if we need to kill off this job + unless ($jobs{$job}->{active}) { + debug("killing job: $job"); + if ($jobs{$job}->{pid}) { + kill 15, $jobs{$job}->{pid}; + } else { + delete $pids{$jobs{$job}->{pid}}; + delete $jobs{$job}; + } + next; + } + + # now, the following path does a reload of this job if necessary + next unless $jobs{$job}->{reload} && $plugin->{registered}; + debug("reloading job: $job"); + + # kill off this child if we had one + if ($jobs{$job}->{pid}) { + kill 15, $jobs{$job}->{pid}; + delete $pids{$jobs{$job}->{pid}}; + $todie{$jobs{$job}->{pid}} = 1; + debug("[$job] killing child; pid = $jobs{$job}->{pid}"); + $jobs{$job}->{pid} = 0; + } + + # bust out a child for this job + my $pid = fork; + fail("can't fork child: $!") if !defined $pid; + unless ($pid) { + # child path; do some basic setup and then call the worker + $0 .= " [$jobs{$job}->{plugin}: $job]"; + $SIG{INT} = undef; # in case parent is in the foreground + $SIG{TERM} = undef; # no special handling for this + + # call the child which should do all the work and return when it's done + $plugin->{code}->{worker}->($job, $jobs{$job}->{options}); + + # when the above returns, the worker is done, so we exit + exit 0; + } + + # if we get here we're a parent, which means we need to mark this child as + # run and that we don't need to do anything more + $jobs{$job}->{pid} = $pid; + $jobs{$job}->{reload} = 0; + $pids{$pid} = $job; + } +} + +# called by plugins to let us know that they want to be active. they have to provide a +# certain set of minimum functionality which we use. we also import some things into +# their namespace. +sub register_plugin { + my ($plugin, $package, $opts) = @_; + return unless $plugin && $package && $plugins{$plugin} && $opts; + + # make sure they gave us enough functions + unless (ref $opts->{register} && (ref $opts->{worker} || ref $opts->{helper})) { + debug("${plugin} did not provide minimum functionality: register and either worker or helper"); + return; + } + + # now create some aliases in their package so they can get to debug and set + eval "*${package}::debug = *main::debug;"; + eval "*${package}::set = *main::set;"; + eval "*${package}::add_job = *main::add_job;"; + eval "*${package}::get_var = *main::get_var;"; + eval "*${package}::mark_inactive_by_plugin = *main::mark_inactive_by_plugin;"; + + # call the plugin's register function so that it knows we've acknowledged its presence + unless ($opts->{register}->()) { + debug("${plugin}::register() didn't return true"); + return; + } + + # done reloading, mark as reloaded (so we don't reload next time) + $plugins{$plugin}->{code} = $opts; + $plugins{$plugin}->{reload} = 0; + $plugins{$plugin}->{registered} = 1; +} + +# called by us and by helpers to add jobs to the list. if called by a plugin, $respawn_with +# must be specified and should be set to the name of the plugin. otherwise, this job will +# die the next time the config file is changed. +sub add_job { + my ($job, $plugin, $options, $respawn_with) = @_; + fail("Bad input to add_job: job = $job, plugin = $plugin") + unless $job && $plugin; + $options ||= []; + + # now print out debugging info + #debug("found config: $job: $plugin(" . join(', ', @$options) . ")"); + + # make sure this plugin exists + fail("Plugin $plugin not defined") + unless $plugins{$plugin}; + + # default %jobs setup + $jobs{$job} ||= { + plugin => $plugin, + pid => 0, + reload => 1, + options => $options, + respawn_with => $respawn_with, + }; + $jobs{$job}->{active} = 1; # on unconditionally + + # now determine if this job needs reloading + $jobs{$job}->{reload} = 1 unless $jobs{$job}->{pid}; + if (scalar(@$options) == scalar(@{$jobs{$job}->{options}})) { + # compare options one by one, reload if any have changed + for (my $i = 0; $i < scalar(@$options); $i++) { + $jobs{$job}->{reload} = 1 + if $options->[$i] ne $jobs{$job}->{options}->[$i]; + } + } else { + # number of options changed, reload them all + $jobs{$job}->{reload} = 1; + } + + # if reload, copy in new options just in case + if ($jobs{$job}->{reload}) { + @{$jobs{$job}->{options}} = @$options; + } + + return 1; +} + +# called by helpers to mark everything they've spawned as inactive before +# they begin another round of adding jobs. this is basically a way to say +# to the gatherer that a process is dead. if it's not re-added immediately +# by the helper, it gets killed off in the next round of reaping. +sub mark_inactive_by_plugin { + my $plugin = shift; + + foreach my $job (keys %jobs) { + $jobs{$job}->{active} = 0 + if $jobs{$job}->{respawn_with} eq $plugin; + } +} + +# used by plugins to get access to variables set in the config file +sub get_var { + return $opts{$_[0] || ''}; +} diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/plugins/config_generator.pl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/plugins/config_generator.pl Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,113 @@ +# automatic monitoring config generator plugin for LiveJournal. to use, add a +# line like this to your SPUD config: +# +# config_generator(mysql, perlbal, memcached, mogstored, mogilefsd) +# +# written by Mark Smith <junior@danga.com> + +package ConfigGenPlugin; + +use strict; + +# called when we're loaded. here we can do anything necessary to set ourselves +# up if we want. in this case we just load the LJ libraries. +sub register { + # load up our livejournal files + use lib "$ENV{LJHOME}/cgi-bin"; + require 'ljlib.pl'; + + # signal success if we get here + return 1; +} + +# this is called and given the job name as the first parameter and an array ref of +# options passed in as the second parameter. +sub helper { + my $options = shift; + + # put options into hashref for easy use later + my %opts; + foreach my $opt (@$options) { + my @parms = split(/\s*=\s*/, $opt); + my $job = shift(@parms); + $opts{$job} = \@parms; + } + + # this is the main loop + LJ::start_request(); + + # mark all of our jobs as being inactive so that if we don't readd them below + # they'll get reaped automatically. + mark_inactive_by_plugin('config_generator'); + + # look for any perlbals that need monitoring jobs + if ($opts{perlbal}) { + while (my ($srvr, $ipaddr) = each %LJ::PERLBAL_SERVERS) { + add_job("perlbal.$srvr", "perlbal", [ $ipaddr, @{$opts{perlbal}} ], 'config_generator'); + } + } + + # and now memcache servers + if ($opts{memcached}) { + foreach my $host (@LJ::MEMCACHE_SERVERS) { + my $ipaddr = ref $host ? $host->[0] : $host; + add_job("memcached.$ipaddr", "memcached", [ $ipaddr, @{$opts{memcached}} ], 'config_generator'); + } + } + + # mogilefsd + if ($opts{mogilefsd} && %LJ::MOGILEFS_CONFIG) { + foreach my $ipaddr (@{$LJ::MOGILEFS_CONFIG{hosts}}) { + add_job("mogilefsd.$ipaddr", "mogilefsd", [ $ipaddr, @{$opts{mogilefsd}} ], 'config_generator'); + } + } + + # mogstored + if ($opts{mogstored} && %LJ::MOGILEFS_CONFIG) { + my $mgd = new MogileFS::Admin(hosts => $LJ::MOGILEFS_CONFIG{hosts}); + if ($mgd) { + my (%hosthash, %devhash); + + if (my $hosts = $mgd->get_hosts) { + foreach my $h (@$hosts) { + $hosthash{$h->{hostid}} = $h; + } + } + + if (my $devs = $mgd->get_devices) { + foreach my $d (@$devs) { + $devhash{$d->{devid}} = $d; + } + } + + foreach my $devid (keys %devhash) { + my $host = $hosthash{$devhash{$devid}->{hostid}}; + add_job("mogstored.dev$devid", "mogstored", + [ "http://$host->{hostip}:$host->{http_port}/dev$devid/usage", @{$opts{mogstored}} ], + 'config_generator'); + } + + foreach my $host (values %hosthash) { + my $ipaddr = "$host->{hostip}:7501"; + add_job("mogstored.$ipaddr", "perlbal", [ $ipaddr, @{$opts{perlbal} || []} ], 'config_generator'); + } + } + } + + if ($opts{mysql} || $opts{db} || $opts{database}) { + + } + + # done, call end request and sleep for a while + LJ::end_request(); +} + +# calls the registrar in the main program, giving them information about us. this +# has to be called as main:: or just ::register_plugin because we're in our own +# package and we want to talk to the register function in the main namespace. +main::register_plugin('config_generator', 'ConfigGenPlugin', { + register => \®ister, + helper => \&helper, +}); + +1; diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/plugins/memcached.pl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/plugins/memcached.pl Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,63 @@ +# memcache monitoring plugin for SPUD. this is a simple plugin that gets stats +# information from memcache and sticks it in the server. +# +# written by Mark Smith <junior@danga.com> + +package MemcachedPlugin; + +use strict; + +# called when we're loaded. here we can do anything necessary to set ourselves +# up if we want. +sub register { + debug("memcached plugin registered"); + return 1; +} + +# this is called and given the job name as the first parameter and an array ref of +# options passed in as the second parameter. +sub worker { + my ($job, $options) = @_; + my $ipaddr = shift(@{$options || []}); + my $interval = shift(@{$options || []}) || 5; + return unless $ipaddr; + + # loop and get statistics every second + my $sock; + my $read_input = sub { + my @out; + while (<$sock>) { + s/[\r\n\s]+$//; + last if /^END/; + push @out, $_; + } + return \@out; + }; + while (1) { + $sock ||= IO::Socket::INET->new(PeerAddr => $ipaddr, Timeout => 3); + return unless $sock; + + # basic states command + print $sock "stats\r\n"; + my $out = $read_input->(); + foreach my $line (@$out) { + if ($line =~ /^STAT\s+([\w:]+)\s+(.+)$/) { + my ($stat, $val) = ($1, $2); + set("$job.$stat", $val); + } + } + + # now sleep some between doing things + sleep $interval; + } +} + +# calls the registrar in the main program, giving them information about us. this +# has to be called as main:: or just ::register_plugin because we're in our own +# package and we want to talk to the register function in the main namespace. +main::register_plugin('memcached', 'MemcachedPlugin', { + register => \®ister, + worker => \&worker, +}); + +1; diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/plugins/mogilefsd.pl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/plugins/mogilefsd.pl Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,64 @@ +# mogilefsd monitoring plugin. this looks at the stats which is a very quick +# operation for the mogilefsd server. plans for this plugin are to start +# monitoring replication, recent queries, etc. +# +# written by Mark Smith <junior@danga.com> + +package MogilefsdPlugin; + +use strict; + +# called when we're loaded. here we can do anything necessary to set ourselves +# up if we want. +sub register { + debug("mogilefsd plugin registered"); + return 1; +} + +# this is called and given the job name as the first parameter and an array ref of +# options passed in as the second parameter. +sub worker { + my ($job, $options) = @_; + my $ipaddr = shift(@{$options || []}); + my $interval = shift(@{$options || []}) || 5; + return unless $ipaddr; + + # test plugin simply loops and once a second sets a "heartbeat" + my $sock; + my $read_input = sub { + my @out; + while (<$sock>) { + s/[\r\n\s]+$//; + last if /^\./; + push @out, $_; + } + return \@out; + }; + while (1) { + $sock ||= IO::Socket::INET->new(PeerAddr => $ipaddr, Timeout => 3); + return unless $sock; + + # basic states command + print $sock "!stats\r\n"; + my $out = $read_input->(); + foreach my $line (@$out) { + if ($line =~ /^([\w:]+)\s+(.+)$/) { + my ($stat, $val) = ($1, $2); + set("$job.$stat", $val); + } + } + + # now sleep some between doing things + sleep $interval; + } +} + +# calls the registrar in the main program, giving them information about us. this +# has to be called as main:: or just ::register_plugin because we're in our own +# package and we want to talk to the register function in the main namespace. +main::register_plugin('mogilefsd', 'MogilefsdPlugin', { + register => \®ister, + worker => \&worker, +}); + +1; diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/plugins/mogstored.pl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/plugins/mogstored.pl Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,74 @@ +# mogstored device monitoring plugin. this fetches the 'usage' file from a device +# and parses it, putting the information in the server. +# +# written by Mark Smith <junior@danga.com> + +package MogstoredPlugin; + +# packages we need +use LWP::Simple; +use Time::HiRes qw(gettimeofday tv_interval); + +# called when we're loaded. here we can do anything necessary to set ourselves +# up if we want. +sub register { + debug("mogstored plugin registered"); + return 1; +} + +# this is called and given the job name as the first parameter and an array ref of +# options passed in as the second parameter. +sub worker { + my ($job, $options) = @_; + my $url = shift(@{$options || []}); + my $interval = shift(@{$options || []}) || 60; + return unless $url; + + # get stats every $interval seconds + while (1) { + my $t0 = [ gettimeofday ]; + my $doc = get($url); + my $time = tv_interval($t0); + unless (defined $doc) { + set("$job.status", "fetch_failure"); + sleep $interval; + next; + } + + # split the doc and parse + my %stats; + foreach (split(/\r?\n/, $doc)) { + next unless /^(\w+):\s+(.+)$/; + my ($key, $val) = ($1, $2); + $stats{$key} = $val; + } + + # if we couldn't parse it + unless ($stats{time} && $stats{total} && $stats{used} && $stats{available}) { + set("$job.status", "parse_failure"); + sleep $interval; + next; + } + + # mark this as successfully retrieved + set("$job.status", "success"); + set("$job.time", $stats{time}); + set("$job.used", $stats{used}); + set("$job.available", $stats{available}); + set("$job.total", $stats{total}); + set("$job.delay", sprintf("%5.3f", $time)); + + # sleep a good 60 seconds, as this file doesn't change very often + sleep $interval; + } +} + +# calls the registrar in the main program, giving them information about us. this +# has to be called as main:: or just ::register_plugin because we're in our own +# package and we want to talk to the register function in the main namespace. +main::register_plugin('mogstored', 'MogstoredPlugin', { + register => \®ister, + worker => \&worker, +}); + +1; diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/plugins/perlbal.pl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/plugins/perlbal.pl Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,65 @@ +# perlbal monitoring plugin. very simple right now, this gets the output of the states +# command and saves it to spud. this is also used on the mogstored sidechannel, which +# is a perlbal management interface. +# +# written by Mark Smith <junior@danga.com> + +package PerlbalPlugin; + +use strict; + +# called when we're loaded. here we can do anything necessary to set ourselves +# up if we want. +sub register { + debug("perlbal plugin registered"); + return 1; +} + +# this is called and given the job name as the first parameter and an array ref of +# options passed in as the second parameter. +sub worker { + my ($job, $options) = @_; + my $ipaddr = shift(@{$options || []}); + my $interval = shift(@{$options || []}) || 5; + return unless $ipaddr; + + # try to get states every second + my $sock; + my $read_input = sub { + my @out; + while (<$sock>) { + s/[\r\n\s]+$//; + last if /^\./; + push @out, $_; + } + return \@out; + }; + while (1) { + $sock ||= IO::Socket::INET->new(PeerAddr => $ipaddr, Timeout => 3); + return unless $sock; + + # basic states command + print $sock "states\r\n"; + my $out = $read_input->(); + foreach my $line (@$out) { + if ($line =~ /^(.+?)\s+(\w+)\s+(\d+)$/) { + my ($class, $state, $count) = ($1, $2, $3); + $class =~ s/^(.+::)//; + set("$job.$class.$state", $count); + } + } + + # now sleep some between doing things + sleep $interval; + } +} + +# calls the registrar in the main program, giving them information about us. this +# has to be called as main:: or just ::register_plugin because we're in our own +# package and we want to talk to the register function in the main namespace. +main::register_plugin('perlbal', 'PerlbalPlugin', { + register => \®ister, + worker => \&worker, +}); + +1; diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/plugins/test.pl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/plugins/test.pl Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,37 @@ +# test/heartbeat plugin for SPUD statistic gathering system +# written by Mark Smith <junior@danga.com> + +# this is mostly a demonstration of how to create a very simple plugin for SPUD. +# more complex examples can be found elsewhere in the plugins directory. + +# doesn't matter what package you're in +package TestPlugin; + +# called when we're loaded. here we can do anything necessary to set ourselves +# up if we want. +sub register { + debug("test plugin registered"); + return 1; +} + +# this is called and given the job name as the first parameter and an array ref of +# options passed in as the second parameter. +sub worker { + my ($job, $options) = @_; + + # test plugin simply loops and once a second sets a "heartbeat" + while (1) { + set("test.$job" => 1); + sleep 1; + } +} + +# calls the registrar in the main program, giving them information about us. this +# has to be called as main:: or just ::register_plugin because we're in our own +# package and we want to talk to the register function in the main namespace. +main::register_plugin('test', 'TestPlugin', { + register => \®ister, + worker => \&worker, +}); + +1; diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/replicator --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/replicator Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,224 @@ +#!/usr/bin/perl +# +# Danga's Statistics Server Replicator +# Very lightweight program that replicates data from one statistic server (subscribe *) +# to another one. Very, very simple and lightweight. Uses open3() to use the ssh command +# to connect to a remote statistics server. +# +# Command line options: +# +# -c STRING set what config file to use for options +# -D if present, tell the server to daemonize +# +# Configuration file format: +# +# server = STRING location of statistics server +# port = INT port number server listens on +# ssh_host = STRING host of remote SSH to tunnel through +# ssh_port = INT port SSH daemon is on +# ssh_key = STRING filename to use for our private key +# ssh_user = STRING username to identify as to SSH server +# +# Copyright 2004, Danga Interactive +# +# Authors: +# Mark Smith <marksmith@danga.com> +# +# License: +# undecided. +# + +# uses +use strict; +use IO::Socket; +use IO::Select; +use IPC::Open3; +use Getopt::Long; +use Carp; +use POSIX ":sys_wait_h"; +use Socket qw(PF_INET IPPROTO_TCP SOCK_STREAM); +use Danga::Socket; + +# Command-line options will override +my ($daemonize, $conf_file); +Getopt::Long::Configure( "bundling" ); +Getopt::Long::GetOptions( + 'D|daemon' => \$daemonize, + 'c|config=s' => \$conf_file, +); +die "You must at least specify --config=FILENAME for me to work.\n" unless $conf_file; +die "File '$conf_file' doesn't exist for configuration.\n" unless -e $conf_file; + +# parse the config file +my %config; +open FILE, "<$conf_file" + or die "Unable to open config file: $!\n"; +while (my $line = <FILE>) { + if ($line =~ /^\s*([^#].*)\s*=\s*(.*)\s*/) { + my ($l, $r) = (trim($1), trim($2)); + if ($l eq 'server') { $config{to}{host} = $r; } + elsif ($l eq 'port') { $config{to}{port} = $r+0; } + elsif ($l eq 'ssh_key') { $config{from}{key} = $r; } + elsif ($l eq 'ssh_host') { $config{from}{host} = $r; } + elsif ($l eq 'ssh_port') { $config{from}{port} = $r+0; } + elsif ($l eq 'ssh_user') { $config{from}{username} = $r; }; + } +} +close FILE; + +# daemonize? +daemonize() if $daemonize; + +# connect to stats server we're replicating TO. messy. +my $dsock = new_connection() + or die "Can't get initial socket... exiting.\n"; + +# now setup our ssh open3 sockets +my ($reader, $writer); +my $sshpid = open3($writer, $reader, $reader, 'ssh', '-C', '-tt', '-i', $config{from}{key}, + '-p', $config{from}{port}, '-l', $config{from}{username}, $config{from}{host}); +print $writer "sub *\n"; + +# always kill off our SSH connection +$SIG{TERM} = sub { kill 15, $sshpid; exit 0; }; +$SIG{INT} = sub { kill 15, $sshpid; exit 0; }; + +# variables used later +my $readbuf; + +# this is our main reading loop +my $sobj = new IO::Select; +$sobj->add($reader); + +# post event loop +my $postloop = sub { + # check if somehow ssh died on us? :-/ + my $pid = waitpid -1, WNOHANG; + if ($pid == $sshpid) { + # sleep a few seconds and try to spawn a new one + $sshpid = 0; + while (!$sshpid) { + print "Lost SSH connection... sleeping 5 seconds.\n"; + sleep 5; + $sshpid = open3($writer, $reader, $reader, 'ssh', '-C', '-tt', '-i', $config{from}{key}, + '-p', $config{from}{port}, '-l', $config{from}{username}, $config{from}{host}); + print $writer "sub *\n"; + } + } + + # if our parent socket is closed... + unless ($dsock && !$dsock->{closed}) { + # create a new one if we can + print "Lost SPUD connection... reconnecting...\n"; + $dsock = new_connection(); + unless ($dsock) { + print "\tUnable to connect... pausing a second.\n"; + sleep 1; + return 1; + } + } + + # see if we can read from our socket yet + my @ready = $sobj->can_read(0.1); + return 1 unless @ready; + + # must be ready to read + my $bytes = sysread $reader, $readbuf, 1024, length $readbuf; + while ($readbuf =~ s/(.+?)\r?\n//) { + my $line = $1; + next unless $line =~ /^set/i; + $dsock->write("Q$line\r\n"); + } + return 1; +}; + +# now configure the client +Client->SetLoopTimeout(100); # 100 milliseconds timeout +Client->SetPostLoopCallback($postloop); # have it call us + +# now run the event loop +Client->EventLoop(); + +# kill off our child too +kill 15, $sshpid; +print "replicator terminating\n"; + +# daemonizer routine +sub daemonize { + my($pid, $sess_id, $i); + + ## Fork and exit parent + if ($pid = fork) { exit 0; } + + ## Detach ourselves from the terminal + croak "Cannot detach from controlling terminal" + unless $sess_id = POSIX::setsid(); + + ## Prevent possibility of acquiring a controling terminal + $SIG{'HUP'} = 'IGNORE'; + if ($pid = fork) { exit 0; } + + ## Change working directory + chdir "/"; + + ## Clear file creation mask + umask 0; + + ## Close open file descriptors + close(STDIN); + close(STDOUT); + close(STDERR); + + ## Reopen stderr, stdout, stdin to /dev/null + open(STDIN, "+>/dev/null"); + open(STDOUT, "+>&STDIN"); + open(STDERR, "+>&STDIN"); +} + +# little trimming sub +sub trim { + my $res = shift; + $res =~ s/^\s+//; + $res =~ s/\s+$//; + return $res; +} + +# connect anew to the SPUD server we're replicating to +# NOTE: can return undef if we can't get to the server! +sub new_connection { + my $sock; + socket $sock, PF_INET, SOCK_STREAM, IPPROTO_TCP; + die "error: can't make socket\n" + unless $sock && defined fileno($sock); + IO::Handle::blocking($sock, 0); + connect $sock, Socket::sockaddr_in($config{to}{port}, Socket::inet_aton($config{to}{host})); + my $dsock = Client->new($sock) + or return undef; + $dsock->watch_write(1); + $dsock->watch_read(1); + return $dsock; +} + +########################################################################### +### Client class for use in processing input/output +package Client; + +use base "Danga::Socket"; + +sub event_read { + # read and toss, we don't care about input from the user here + my Client $self = $_[0]; + my $bref = $self->read; +} + +sub event_err { + # connection died? + my Client $self = $_[0]; + $self->close('event_err'); +} + +sub event_hup { + # connection to server died... + my Client $self = $_[0]; + $self->close('event_hup'); +} diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/rrd-storage --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/rrd-storage Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,132 @@ +#!/usr/bin/perl + +use strict; +use RRDs; +use IO::Socket::INET; +use Getopt::Long; +use Danga::Daemon; + +# parse out command line options +my ($server, $port, $rrd_path); +my $res = GetOptions( + "server=s" => \$server, + "port=i" => \$port, + "rrd-path=s" => \$rrd_path, +); +die "Unable to parse options.\n" unless $res; + +# make sure we have a minimum set of options +die "Please specify a path to access the RRD files in --rrd-path.\n" + unless $rrd_path; +die "RRD path $rrd_path does not exist or is not a directory.\n" + unless -e $rrd_path && -d $rrd_path; +die "Requires --server (and a --port if it's not 9107).\n" + unless $server; + +# setup more internal variables +$port ||= 9107; +my $spud_server = "$server:$port"; + +# connect to local spud server +my $sock = IO::Socket::INET->new(PeerAddr => $spud_server) + or die "Can't connect to server: $!\n"; +print $sock "sub *\r\n"; + +# data storage +my $shutdown = 0; +my $lastpurge = time(); +my (%data, %exists); + +# now we want to daemonize +Danga::Daemon::daemonize( + \&worker, + { + interval => 1, + } +); + +# process updates as they come in +sub worker { + # try to reconnect? + debug("Worker start..."); + unless ($sock) { + $sock = IO::Socket::INET->new(PeerAddr => $spud_server) + or return; + print $sock "sub *\r\n"; + } + + # loop and process what they're giving us + debug("Beginning main loop..."); + while (<$sock>) { + my $line = $_; + $line =~ s/[\r\n]+$//; + next unless $line =~ /^set\s+(\d+)\s+(\S+)\s+(\d+)$/i; + my ($what, $time, $dpoint) = ($2, $1, $3); + + # fix up the colons in $what to be underscores :/ + $what =~ s/:/_/g; + push @{$data{$what} ||= []}, [ $time, $dpoint ]; + + # and if necessary, do a purge + if ($Danga::Daemon::stop || ($lastpurge + 60 < time())) { + do_purge(); + $lastpurge = time(); + } + + # done if we were told to shutdown + return if $Danga::Daemon::stop; + } + + # got undef from $sock... so undef $sock itself + debug("Oops, lost connection to SPUD server..."); + $sock = undef; +} + +sub do_purge { + debug("** Beginning purge..."); + + # dump to file + my ($keys, $count); + foreach my $what (keys %data) { + # update this particular file + $keys++; + my $fn = "$rrd_path/$what"; + unless ($exists{$fn}) { + unless (-e $fn) { + RRDs::create($fn, + "--start", "-120", # up to two minutes ago... should be enough + "--step", "5", # data is expected to be 5 seconds apart + "DS:val:GAUGE:10:U:U", # all data in 'val', 10 seconds before UNKNOWN + 'RRA:AVERAGE:0.5:1:1440', # past 2 hours of data + 'RRA:AVERAGE:0.5:60:288', # 5 minute averages for the day + 'RRA:AVERAGE:0.5:360:336', # 30 minute averages for a week + 'RRA:AVERAGE:0.5:720:720', # 1 hour averages for 30 days + 'RRA:AVERAGE:0.5:17280:365', # 1 day averages for a year + 'RRA:AVERAGE:0.5:120960:520', # 1 week averages for 10 years + ); + if (my $err = RRDs::error) { + die "Error creating RRD file: $err\n"; + } + } + $exists{$fn} = 1; + } + + # now pipe out the updates + my @updates; + foreach my $row (@{$data{$what}}) { + $count++; + push @updates, "$row->[0]:$row->[1]"; + } + RRDs::update($fn, @updates); + #debug("\t$what: ", scalar(@updates), " written"); + if (my $err = RRDs::error) { + warn "WARNING: Error updating $fn: $err\n"; + } + } + debug("\tupdates processed: $count"); + debug("\tfiles touched: $keys"); + debug("\tratio: %.2f updates per file\n", ($count / $keys)); + + # and now that we're done with that old data ... + %data = (); +} diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/server --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/server Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,465 @@ +#!/usr/bin/perl +# +# Danga's Statistics Server +# This program listens on a port and acts as a very generic data server. It lets you get +# and set data as well as subscribe to information flows given glob patterns. Commands: +# +# get <what> +# get some particular data. also gives you the time this data was set. you can also +# specify a pseudo-regexp here in the same format as the subscribe command. +# [q]set [time] <what> <data> +# set what to be data. if you specify a time, it will set the time you set it as that +# time. also, if you specify qset, you will not be given an OK prompt afterwards. +# quickset <on|off> +# if you turn quickset on, sets will be processed without processing the subscriptions, +# instead waiting until you turn quickset off, at which point all pending subscriptions +# will be processed. +# subscribe <pattern> +# pattern is some combination of letters and asterisks. "foo.*" subscribes to anything +# that happens to start with "foo.". you can also subscribe to simply "*" to get all +# information that gets set. or, "*.system" to see everything relating to a system, +# e.g. mysql, memcache, etc. +# list +# list your subscription patterns. if you want to remove subscriptions, you need to +# reconnect to the server. +# listall +# list all active subscriptions. (see what other people are watching, if you care.) +# quit +# disconnect your socket nicely. +# shutdown +# kill the statistics server. +# uptime +# print out the current server statistics... number of keys and subscriptions and the +# time the server has been running. +# +# Command line options: +# +# -p INT set the port the server should listen on +# -D if present, tell the server to daemonize +# +# Copyright 2004, Danga Interactive +# +# Authors: +# Mark Smith <marksmith@danga.com> +# +# License: +# undecided. +# + +package StatServ; + +use strict; +use Getopt::Long; +use IO::Socket::INET; +use POSIX; +use Carp; +use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET); +use constant TCP_KEEPIDLE => 4; # Start keeplives after this period +use constant TCP_KEEPINTVL => 5; # Interval between keepalives +use constant TCP_KEEPCNT => 6; # Number of keepalives before death + +# declare our globals/defaults +my $conf_port = 9107; +my $daemonize = 0; +my $uptime = time(); +my $quickset = 0; +my $lastcleantime = time(); + +# internal data, not to be touched externally +my %data; # { "fact-name" => [ [ set-time, data ], [ set-time, data ], ... ] } +my @sortedkeys; # sorted keys (for pretty output) +my %subs; # { "fact-name" => [ $subscriber, $subscriber, ... ] } +my @sublist; # list of subscriptions + +# Command-line options will override +Getopt::Long::Configure('bundling'); +Getopt::Long::GetOptions( + 'p|port=i' => \$conf_port, + 'D|daemon' => \$daemonize, +); + +# establish SERVER socket, bind and listen. +my $server = IO::Socket::INET->new(LocalPort => $conf_port, + Type => SOCK_STREAM, + Proto => 'tcp', + Blocking => 1, + Reuse => 1, + Listen => 10) + or die "Error creating socket: $@\n"; + +# make socket nonblocking +IO::Handle::blocking($server, 0); + +my $accept_handler = sub { + my $csock = $server->accept(); + return unless $csock; + + IO::Handle::blocking($csock, 0); + setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + # Enable keep alive + (setsockopt($csock, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1)) && + setsockopt($csock, IPPROTO_TCP, TCP_KEEPIDLE, pack("l", 30)) && + setsockopt($csock, IPPROTO_TCP, TCP_KEEPCNT, pack("l", 10)) && + setsockopt($csock, IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)) && + 1 + ) || die "Couldn't set keep-alive settings on socket (Not on Linux?)"; + + my $client = Client->new($csock); + $client->watch_read(1); +}; + +# daemonize if we should +daemonize() if $daemonize; + +Client->OtherFds(fileno($server) => $accept_handler); +Client->EventLoop(); +exit 0; + +############################################################################# +###### SUBROUTINES ########################################################## +############################################################################# + +sub daemonize { + my($pid, $sess_id, $i); + + ## Fork and exit parent + if ($pid = fork) { exit 0; } + + ## Detach ourselves from the terminal + croak "Cannot detach from controlling terminal" + unless $sess_id = POSIX::setsid(); + + ## Prevent possibility of acquiring a controling terminal + $SIG{'HUP'} = 'IGNORE'; + if ($pid = fork) { exit 0; } + + ## Change working directory + chdir "/"; + + ## Clear file creation mask + umask 0; + + ## Close open file descriptors + close(STDIN); + close(STDOUT); + close(STDERR); + + ## Reopen stderr, stdout, stdin to /dev/null + open(STDIN, "+>/dev/null"); + open(STDOUT, "+>&STDIN"); + open(STDERR, "+>&STDIN"); +} + +# internal setter that has the logic that actually handles setting key/value +# pairs. also performs notifications, unless specifically told otherwise. +sub _set { + my ($time, $key, $val, $nosubs) = @_; + $nosubs ||= 0; + + # push onto array + unless (defined $data{$key}) { + $data{$key} = []; + @sortedkeys = sort keys %data; + } + unshift @{$data{$key}}, [ $time, "$val" ]; + pop @{$data{$key}} if @{$data{$key}} > 100; + + # handle subscriptions + unless ($nosubs) { + unless (defined $subs{$key}) { + $subs{$key} = []; # define it and set blank if it's not defined + foreach my $sub (@sublist) { + push @{$subs{$key}}, $sub->[0] + if $key =~ /$sub->[1]/i; + } + } + $_->set_line("$time $key $val") foreach @{$subs{$key}}; + } + + # done + return 1; +} + +# internal getter. gets a value for a single key. +sub _get { + my $key = shift; + return wantarray ? () : undef unless $data{$key}; + + # now, get data. return all points? or just most recent? + return $data{$key} if wantarray; + return undef unless @{$data{$key}}; + return $data{$key}->[0]->[1]; # text of most recent value +} + +# internal incrementers. simply gets a value and sets it to val +/- 1. +# this is used by stats within the server only, and not exposed to the +# outside world as of right now. +sub _incr { _set(time(), $_[0], _get($_[0])+1, 1); } +sub _decr { _set(time(), $_[0], _get($_[0])-1, 1); } + +##################################################################### +### C L I E N T C L A S S +##################################################################### +package Client; + +use Danga::Socket (); +use base qw{Danga::Socket}; +use fields qw(read_buf t_out_buf do_buffer); + +sub new { + my Client $self = shift; + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + + StatServ::_incr("server.clients.total"); + return $self; +} + +# Client +sub event_read { + my $self = shift; + + my $bref = $self->read(1024); + return $self->close() unless defined $bref; + $self->{read_buf} .= $$bref; + + StatServ::_incr("server.clients.event_reads"); + while ($self->{read_buf} =~ s/^(.+?)\r?\n//) { + my $line = $1; + $self->process_line( $line ); + } +} + +# Client +sub event_err { StatServ::_incr("server.clients.event_errs"); my $self = shift; $self->close; } +sub event_hup { StatServ::_incr("server.clients.event_hups"); my $self = shift; $self->close; } + +sub process_line { + my Client $self = shift; + my $line = shift; + + StatServ::_incr("server.clients.process_lines"); + + # clean out closed clients every few seconds + my $now = time(); + if ($lastcleantime + 10 < $now) { + foreach my $key (keys %subs) { + @{$subs{$key}} = grep { !$_->{closed} } @{$subs{$key}}; + } + @sublist = grep { !$_->[0]->{closed} } @sublist; + $lastcleantime = $now; + } + + if ($line =~ /^(\w+)\s*(.*)/) { + my ($cmd, $args) = ($1, $2); + $cmd = lc($cmd); + my ($one, $two); + if ($args =~ /^([-\w.:]+)\s+(.+)/) { + ($one, $two) = ($1, $2); + } + + # statistics keeping + StatServ::_incr("server.clients.commands"); + + # see if this is a command we know about + if ($cmd eq 'get') { + # get something + StatServ::_incr("server.com_$cmd"); + return $self->err_line('no_args') unless $args; + if (defined $data{$args} && ref $data{$args} eq 'ARRAY') { + my $ct = 0; + $self->begin_output(); + foreach my $row (@{$data{$args}}) { + $ct++; + $self->ok_line("$ct $row->[0] $row->[1]"); + } + $self->ok_line('done'); + return $self->end_output(); + } else { + # see if this should be used as a regular expression + if ($args =~ /[.*]/) { + $args =~ s/\./\\./g; + $args =~ s/\*/.*/g; + $self->begin_output(); + foreach my $key (@sortedkeys) { + if ($key =~ /$args/i) { + my $first = $data{$key}->[0]; + $self->ok_line("$key $first->[0] $first->[1]"); + } + } + $self->ok_line('done'); + return $self->end_output(); + } + return $self->err_line("not_found"); + } + + } elsif ($cmd =~ /^(q)?set$/) { + # set something + StatServ::_incr("server.com_$cmd"); + return $self->err_line('need_two_args') unless defined $one && defined $two; + my $quiet = $1 eq 'q' ? 1 : 0; + + # see if $one happens to be a number? that means they gave us a time to use. + my $time; + if ($one =~ /^\d+$/ && $two =~ /^([-\w.:]+)\s+(.+)$/) { + $time = $one + 0; + ($one, $two) = ($1, $2); + } + $time = time() unless defined $time; + + # push data onto front + StatServ::_set($time, $one, $two); + + # all done + return $quiet ? 1 : $self->ok_line(); + + } elsif ($cmd =~ /^sub/) { + # subscribe to something + # convert "*.mysql" to ".*\.mysql", etc. + StatServ::_incr("server.com_$cmd"); + return $self->err_line('no_args') unless $args; + $args =~ s/\./\\./g; + $args =~ s/\*/.*/g; + return $self->ok_line($args) # no dupes in @sublist! + if grep { $args eq $_->[1] && $self == $_->[0] } @sublist; + push @sublist, [ $self, $args ]; + + # now see what subscriptions this matches + foreach my $key (keys %subs) { + if ($key =~ /$args/i) { + # no dupes! + next if grep { $_ == $self } @{$subs{$key}}; + push @{$subs{$key}}, $self; + } + } + + # return okay to the person who set something + return $self->ok_line($args); + + } elsif ($cmd =~ /^list(all)?$/) { + # list out your subscriptions + StatServ::_incr("server.com_$cmd"); + foreach my $sub (@sublist) { + $self->ok_line("$sub->[0] $sub->[1]") + if $1 eq 'all' || $sub->[0] == $self; + } + return $self->ok_line('done'); + + } elsif ($cmd eq 'shutdown') { + # kill ourselves + StatServ::_incr("server.com_$cmd"); + exit 0; + + } elsif ($cmd eq 'uptime') { + # figure out our uptime + StatServ::_incr("server.com_$cmd"); + my $c = time() - $uptime; + my ($d, $h, $m, $s) = (int($c / 86400), int(($c % 86400) / 3600), + int(($c % 3600) / 60), int($c % 60)); + my $l = sprintf 'stats-server up %d days %02d:%02d:%02d', $d, $h, $m, $s; + $self->begin_output(); + $self->ok_line($l); + $l = sprintf '%d subscriptions, %d total keys', scalar(@sublist), scalar(@sortedkeys); + $self->ok_line($l); + $self->ok_line('done'); + return $self->end_output(); + + } elsif ($cmd eq 'quickset') { + StatServ::_incr("server.com_$cmd"); + return $self->err_line('not_on_or_off') unless $args eq 'on' || $args eq 'off'; + + # now set it? + $quickset = $args eq 'on' ? 1 : 0; + return 1; + + } elsif ($cmd eq 'quit') { + # simply quit ... at some point we should clear out subscriptions that they + # have setup... but until then? nah. + StatServ::_incr("server.com_$cmd"); + return $self->close; + + } + } + + # oops, don't know what they wanted + return $self->err_line('unknown_command'); +} + +sub ok_line { + my Client $self = shift; + my $args = shift; + if ($self->{do_buffer}) { + StatServ::_incr("server.lines.ok.buffered"); + $self->{t_out_buf} .= "OK $args\r\n"; + } else { + StatServ::_incr("server.lines.ok.immediate"); + $self->write("OK $args\r\n"); + } + return 1; +} + +sub set_line { + my Client $self = shift; + my $args = shift; + if ($self->{do_buffer}) { + StatServ::_incr("server.lines.set.buffered"); + $self->{t_out_buf} .= "SET $args\r\n"; + } else { + StatServ::_incr("server.lines.set.immediate"); + $self->write("SET $args\r\n"); + } + return 1; +} + +sub err_line { + my Client $self = shift; + my $err_code = shift; + my $err_text = { + 'unknown_command' => "Unknown server command", + 'not_found' => "Information not in record", + 'no_args' => "No arguments provided", + 'need_two_args' => "Two arguments needed", + 'no_subscriptions' => "No subscriptions found", + 'not_on_or_off' => "Argument wasn't of form 'on' or 'off'", + }->{$err_code}; + + if ($self->{do_buffer}) { + StatServ::_incr("server.lines.err.buffered"); + $self->{t_out_buf} .= "ERR $err_code $err_text\r\n"; + } else { + StatServ::_incr("server.lines.err.immediate"); + $self->write("ERR $err_code $err_text\r\n"); + } + return 0; +} + +sub begin_output { + my Client $self = shift; + StatServ::_incr("server.lines.begin_outputs"); + $self->{do_buffer} = 1; + return 1; +} + +sub end_output { + my Client $self = shift; + StatServ::_incr("server.lines.end_outputs"); + $self->{do_buffer} = 0; + $self->write($self->{t_out_buf}); + $self->{t_out_buf} = ''; + return 1; +} + +sub eurl +{ + my $a = $_[0]; + $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg; + $a =~ tr/ /+/; + return $a; +} + +# Local Variables: +# mode: perl +# c-basic-indent: 4 +# indent-tabs-mode: nil +# End: diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/bin/wrapper --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/bin/wrapper Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,162 @@ +#!/usr/bin/perl +# +# Danga's Statistics Wrapper +# This little program simply runs a command and pipes the output to a +# statistics server so that you can see the output of the command. Data +# will be stored in $KEY.output and $KEY.status, so you can determine +# exactly how long a run took. +# +# Command line options: +# +# -s STRING server to connect to +# -p INT port statistics server is running on +# -k STRING key to put data in ($KEY.status, $KEY.output) +# -e STRING command to execute +# -c STRING file to load config data from +# +# Configuration file format: +# +# server = STRING location of server +# port = INT port of server +# +# Copyright 2004, Danga Interactive +# +# Authors: +# Mark Smith <marksmith@danga.com> +# +# License: +# undecided. +# + +# uses +use strict; +use IO::Socket; +use IPC::Open3; +use Getopt::Long; +use Carp; +use POSIX; + +# Command-line options will override +my ($daemonize, $conf_file, $key, $host, $port, $cmd); +Getopt::Long::Configure( "bundling" ); +Getopt::Long::GetOptions( + 'D|daemon' => \$daemonize, + 'c|config=s' => \$conf_file, + 'k|key=s' => \$key, + 'p|port=i' => \$port, + 's|server=s' => \$host, + 'e|exec=s' => \$cmd, +); + +# if we don't have all the data we need, parse a config file +die "No key (-k KEY) specified.\n" unless $key; +unless ($port && $host) { + die "You must at least specify --config=FILENAME for me to work.\n" unless $conf_file; + die "File '$conf_file' doesn't exist for configuration.\n" unless -e $conf_file; + + # parse the config file + open FILE, "<$conf_file" + or die "Unable to open config file: $!\n"; + while (my $line = <FILE>) { + if ($line =~ /^\s*([^#].*)\s*=\s*(.*)\s*/) { + my ($l, $r) = (trim($1), trim($2)); + if ($l eq 'server') { $host = $r; } + elsif ($l eq 'port') { $port = $r+0; } + } + } + close FILE; +} + +# if cmd was specified, use that +@ARGV = split /\s+/, $cmd if $cmd; + +# error checking +die "No server to connect to.\n" unless $port && $host; +die "No arguments to run.\n" unless @ARGV; +die "File to run ($ARGV[0]) doesn't seem to exist.\n" unless -e $ARGV[0]; + +# daemonize? +daemonize() if $daemonize; + +# connect to stats server we're replicating TO +my $socket = IO::Socket::INET->new(Proto => 'tcp', + PeerAddr => $host, + PeerPort => $port,) + or die "Unable to connect to local stats server ($host:$port): $!\n"; +$socket->autoflush(1); + +# now setup our open3 socket +my ($reader, $writer); +my $procpid = open3($writer, $reader, $reader, @ARGV); + +# parent pid +my $ppid = $$; + +# now fork +my $kidpid; +die "Unable to fork: $!\n" unless defined($kidpid = fork()); + +# basic parts that handle reading/writing from the two sides +if ($kidpid) { + # parent + while (defined (my $line = <$socket>)) { + # we do this to flush the read buffer, but we don't need to + # echo this information to anybody, as all of this is going + # to be in the form of "OK" and such saying that data got set + } + kill("TERM" => $kidpid); # death to the children + waitpid $kidpid, 0; +} else { + # child + print $socket "QSET $key.status started\n"; + while (defined (my $line = <$reader>)) { + # take the output and set it + $line = trim($line); + print $socket "QSET $key.output $line\n"; + } + print $socket "QSET $key.status finished\n"; + $socket->close; + + # now kill off our parent + kill("TERM" => $ppid); +} + +# daemonizer routine +sub daemonize { + my($pid, $sess_id, $i); + + ## Fork and exit parent + if ($pid = fork) { exit 0; } + + ## Detach ourselves from the terminal + croak "Cannot detach from controlling terminal" + unless $sess_id = POSIX::setsid(); + + ## Prevent possibility of acquiring a controling terminal + $SIG{'HUP'} = 'IGNORE'; + if ($pid = fork) { exit 0; } + + ## Change working directory + chdir "/"; + + ## Clear file creation mask + umask 0; + + ## Close open file descriptors + close(STDIN); + close(STDOUT); + close(STDERR); + + ## Reopen stderr, stdout, stdin to /dev/null + open(STDIN, "+>/dev/null"); + open(STDOUT, "+>&STDIN"); + open(STDERR, "+>&STDIN"); +} + +# little trimming sub +sub trim { + my $res = shift; + $res =~ s/^\s+//; + $res =~ s/[\r\n\s]+$//; + return $res; +} diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/conf/authorized_keys --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/conf/authorized_keys Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,1 @@ +environment="LJUSERNAME=replicator",environment="LJCOMMAND=replicator" ssh-dss SOMEKEY replicator@comment.field.stuff diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/conf/cmdshell.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/conf/cmdshell.conf Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,5 @@ +# configuration for the command shell. these are commands setup so that if +# someone comes in over SSH and their key specifies an environment variable +# of LJCOMMAND, it will be cross referenced against this list + +replicator = 1.2.3.4 : 9107 diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/conf/sshd_config --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/conf/sshd_config Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,27 @@ +### begin config section ##################################################### + +# basic setup information. we run on a different port because this +# SSH daemon's only job is to let our lj management user through. +Port 222 + +# important: only allow us in. set this to the username that you want to +# allow into the system. see the documentation for more setup information. +AllowUsers ljmgmt + +### end config section ####################################################### + +# only allow sshv2 +Protocol 2 + +# do not allow root to log in (they shouldn't be able to due to AllowUsers +# above, but just to be extra paranoid...) +PermitRootLogin no + +# must use SSH keys to login to this daemon +PasswordAuthentication no + +# be very silent (should also touch .hushlogin in the ljmgmt home directory) +PrintMotd no + +# required for our authorized_keys file to work +PermitUserEnvironment yes diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/conf/stats-local.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/conf/stats-local.conf Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,15 @@ +# configuration for the replicator and internal statistic accumulator. + +# first setup our main configuration information to point to where the server +# should be found at. this server is the local network server at the near +# endpoint. this is where data is replicated TO. +server = 127.0.0.1 +port = 9106 + +# now setup the information about the SSH connection we're going to use to +# gather data from the remote server. this is where we will get the data, +# or, the point we're replicating FROM. +ssh_key = /home/lj/.ssh/id_dsa +ssh_host = 1.2.3.4 +ssh_port = 222 +ssh_user = ljmgmt diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/conf/stats.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/conf/stats.conf Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,7 @@ +# global configuration options, this points to our SPUD server +server = 1.2.3.4 +port = 9107 + +# automatically add monitoring plugins for all of the LiveJournal services. the +# times after the equal sign are intervals between plugin updates in seconds. +config_generator(mysql = 5, perlbal = 5, memcached = 5, mogstored = 60, mogilefsd = 5) diff -r 6ad827ca8a90 -r d62f415767a4 src/spud/doc/README.txt --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/spud/doc/README.txt Sun Mar 01 19:21:55 2009 +0000 @@ -0,0 +1,67 @@ +SPUD -- Statistic Polling and Updating Daemon + System Productivity and Utilization Daemon + ...or something + +-- Introduction -- +SPUD is a set of programs that are designed to collect statistics using +a variety of methods that are easily extendable to gather information on +almost anything you could want to get statistics on. It then stores these +bits of data in a simple hash server that supports retrieving data enmasse +as well as subscribing to update events. + + +-- The Components -- +There are several pieces to the system: + + - server + A simple server that stores key/value pairs with 100 data points + of history. + + - gatherer + The program that gathers the actual numbers and puts them in a + stats-server. + + - replicator + Works in conjunction with a properly configured SSH daemon and the + cmdshell utility to get stats from one secure network to another. + + - wrapper + A little script that effectively "wraps" another shell command and + pipes its STDOUT/STDERR to appropriately named keys on the server. + Also keeps track of the start/end times of the last time the command + ran. + + - cmdshell + A simple utility program that works with stats-replicator. + +For more information on each component, please see the various +configuration files in the conf directory as well as the opening comments +of each program. They are fairly well described in there. + + +-- Setup -- +More documentation to come after the bugs are worked out and the system is +actually setup and in use. But, in general: + +1. Run a stats-server in your internal network somewhere and note the port. + +2. Setup the stats.conf for the statistic gatherers. Then run it once on +a machine that's got some spare time to do gathering. Make sure to point +this configuration's config files at the server from step 1. + +3. Setup an SSH daemon and dummy account on a border machine that you can +get to from the outside world and that can also see the internal network. +Set the shell of the dummy user to cmdshell and configure cmdshell. Also +setup the authorized_keys file appropriately. You will want a command +(e.g. replicator) that points at the server setup in step 1. + +4. Setup a second stats-server on your local machine/network. Note down +the port again. + +5. Setup the stats-local.conf on your local end to point to the SSH daemon +that you setup in step 3. Now run the replicator and point it at this +config file and it will start copying data from the first stats server to +the second. + +That's it. Now get some statistics displayers and all of the data gathered +and put onto the first server will be replicated over to the second server. --------------------------------------------------------------------------------