#!/usr/bin/perl
#
# janchor.pl: By Jeremy Nickurak, 2002

# BEGIN CONFIGURATION
use constant SERVICE_NAME => 'headlines.localhost'; # full name of the headline component.
use constant SERVER       => '127.0.0.1'; # ip/hostname of the jabber server to connect to/
use constant PORT         => 5347; # port on the jabber server to connect to
use constant SECRET       => 'secret'; # SECRET phrase for jabber component connection.
# END BASIC CONFIGURATION

# BEGIN EXTRA CONFIGURATION
use constant RSS_DELAY    => 1800; # Interval for RSS checks. Note that many sites will be very upset if you use less then a 30 minute delay, notably, slashdot.
use constant RSS_TIMEOUT  => 15; # Timeout for HTTP connections to RSS sources
use constant SUB_FILE     => 'registrations'; # Subscription DB
use constant CACHE_FILE   => 'rss_cache';         # RSS Cache DB
use constant SOURCE_FILE  => 'sources';       # Source DB
use constant STATUS_FILE  => 'status';        # Source Status DB
use constant VERBOSE	  => 2;                   # Verbosity level for logging output
use constant ALLOW        => 'all'; # JID of the user allowed to register new sources or 'all'
# END EXTRA CONFIGURATION

BEGIN {
	do '/etc/jabber/janchor.rc';
}

use Net::Jabber qw(Component);
use MLDBM 'DB_File';
use Text::Iconv;
use LWP::UserAgent;
use XML::RSS;
use Unicode::String qw(utf8);
use strict;

# Source status enumerations
my $SOURCE_OK         = 0;
my $SOURCE_HTTP_ERROR = 1;
my $SOURCE_XML_ERROR  = 2;
my $SOURCE_OFFLINE    = 3;

# DB-tied hashes
my %reg;
my %cache;
my %sources;
my %status;

$SIG{HUP} = \&do_headlines; # I'm not totally convinced this is correct, but it's convienient.
$SIG{KILL} = \&SigStop;
$SIG{TERM} = \&SigStop;
$SIG{INT} = \&SigStop;

tie (%reg, 'MLDBM', SUB_FILE) or die ("Cannot tie to " . SUB_FILE."!\n");
tie (%cache, 'MLDBM', CACHE_FILE) or die ("Cannot tie to " . CACHE_FILE."!\n");
tie (%sources, 'MLDBM', SOURCE_FILE) or die ("Cannot tie to " . SOURCE_FILE."!\n");
tie (%status, 'MLDBM', STATUS_FILE) or die ("Cannot tie to " . STATUS_FILE."!\n");


# Messages to be associated with source status
my %status_messages = (
		       $SOURCE_OK		=>	"",
		       $SOURCE_HTTP_ERROR	=>	"HTTP Error",
		       $SOURCE_XML_ERROR	=>	"RSS Parse Error",
		       $SOURCE_OFFLINE		=>	"Offline"
		      );
my %status_show = (
		   $SOURCE_OK		=>	undef,
		   $SOURCE_HTTP_ERROR	=>	"dnd",
		   $SOURCE_XML_ERROR	=>	"xa",
		   $SOURCE_OFFLINE	=>	""
		  );

my $Connection = do_connect();
init_status();

# Loop until we're finished.
while (true) {
  my $rss_index;
  for ($rss_index = 0; $rss_index < RSS_DELAY; $rss_index++) {
    Stop("Process failed in main loop.") unless defined($Connection->Process(1));
  }
  do_headlines();
}

log1("ERROR: The connection was killed...\n");

exit(0);

sub InMessage{
  # Incoming message. Respond with source list for normal/chat messages, log error messages.
  my $sid = shift;
  my $message = shift;
  my $from = $message->GetFrom();
  my $to = $message->GetTo();
  my $body = $message->GetBody();
  my $type = $message->GetType();
  if ($type eq 'error') {
    log1("Error received: \n" . $message->GetXML());
    return;
  }
  if (lc($from) eq lc(SERVICE_NAME)) {
	return;
  }
  $body = "Available sources:\n";
  foreach my $source (sort (keys(%sources))) {
    $body .= "  $source: ";
    $body .= $sources{$source};
    $body .= "\n";
  }
  $body .= ("\nYou can subscribe to a newsfeed by adding to your roster users in the format
SOURCENAME\@" . SERVICE_NAME . ".");

  $message->SetTo("$from");
  $message->SetBody($body);
  $message->SetFrom("$to");
  $Connection->Send($message);
}

sub InIQ{
  # Incoming IQ. Handle jabber:iq:registration with add/remove source dialog, return error 501 for other NS's.
  my $sid = shift;
  my $iq = shift;

  my $from = $iq->GetFrom();
  my $to = $iq->GetTo();
  my $id = $iq->GetID();
  my $type = $iq->GetType();
  my $query = $iq->GetQuery();
  return unless $query;
  my $xmlns = $query->GetXMLNS();
  if ($xmlns eq 'jabber:iq:register') {
    if ($type eq 'get') {
        if ((ALLOW ne 'all') and ($from->GetJID("base") ne ALLOW)){
	  $iq->SetType('result');
	  $iq->SetFrom($iq->GetTo());
	  $iq->SetTo($from);
	  $query->SetInstructions('Only the administrator may register sources.');
	  $Connection->Send($iq);
	}
        else{
          $iq->SetType('result');
          $iq->SetFrom($iq->GetTo());
          $iq->SetTo($from);
          $query->SetInstructions('Please supply the name of the source, and optionally a URL to reach that source.
If you supply a URL, it will be assumed that you are adding or updating  a headline source. Otherwise, it will be assumed that you are deleting that source.');
          $query->SetName("");
          $query->SetURL("");
          $Connection->Send($iq);
	}
    } elsif ($type eq 'set') {
      if ((ALLOW ne 'all') and ($from->GetJID("base") ne ALLOW)){
	$iq->SetType('error');
	$iq->SetFrom($to);
	$iq->SetTo($from);
	$iq->SetErrorCode(403);
	$iq->SetError("Forbidden");
	$Connection->Send($iq);
      }
      else{
        my $url = $query->GetURL();
        my $name = $query->GetName();
        if ($url && $name) {
 	  log2 ("Subscribing to $name at $url");
	  $sources{lc($name)} = $url;

          do_rss_source($name);
        } elsif ($name) {
	  log2 ("Unsubscribing from $name");

	  # Send presence to people are still subscribed to this source.
	  my $presence = new Net::Jabber::Presence();
	  $presence->SetType("unsubscribed");
	  $presence->SetStatus("Source does not exist");
	  $presence->SetFrom($name . '@' . SERVICE_NAME);
	  deliver_sub($presence, $name);
	  delete $status{lc($name)};
	  delete $sources{lc($name)};
        }
        $iq->SetType('result');
        $iq->SetFrom($to);
        $iq->SetTo($from);
        $Connection->Send($iq);
      }
    } else {
      $iq->SetType('error');
      $iq->SetFrom($to);
      $iq->SetTo($from);
      $iq->SetErrorCode(501);
      $iq->SetError("Not Implemented");
      $Connection->Send($iq);	
    }
  }
}

sub InPresence {
  # Incoming presence. Reply to subscription requests with proper subscription actions, reply to probe/available presence with an update to individual source's presence.
  # On registration requests, send a source status presence update.

  my $sid = shift;
  my $presence = shift;
  my $from = $presence->GetFrom();
  my $to = $presence->GetTo();
  my ($topic) = split /@/, $to;
  $topic = lc($topic);
  my $type = $presence->GetType();
  my $status = $presence->GetStatus();
  log3 ("Got $type type presence from $from to $to.");

  if ($type eq 'subscribe') {
    $presence->SetType("subscribed");
    $presence->SetTo("$from");
    $presence->SetFrom("$to");
    my $element = $reg{$from};
    $element->{$topic} = 1;
    $reg{$from} = $element;
    $Connection->Send($presence);
    if (defined($sources{$topic})) {
      $presence->SetShow($status_show{$status{$topic}});
      $presence->SetStatus($status_messages{$status{$topic}});
    } else {
      $presence->SetShow("dnd");
      $presence->SetStatus("Source does not exist");
    }

    $Connection->Send($presence);
  } elsif (($type eq 'available') or ($type eq 'probe') or (not $type)) {
    if (($reg{$from})->{$topic}) {
      if (defined($sources{$topic})) {
	$presence->SetShow($status_show{$status{$topic}});
	$presence->SetStatus($status_messages{$status{$topic}});
      } else {
	$presence->SetType("unsubscribed");
	$presence->SetStatus("Source does not exist");
      }

      $presence->SetTo("$from");
      $presence->SetFrom("$to");
      $Connection->Send($presence);
    } else {
      if (defined($sources{$topic})) {
	$presence->SetShow("dnd");
	$presence->SetStatus("You are not subscribed.");
      } else {
	$presence->SetShow("dnd");
	$presence->SetStatus("Source does not exist");
      }
    }
  } elsif ($type eq 'unsubscribe') {
    my $element = $reg{$from};
    delete $element->{$topic};
    $reg{$from} = $element;
    $presence->SetType("unsubscribed");
    $presence->SetTo("$from");
    $presence->SetFrom("$to");
    $Connection->Send($presence);
  }
}

sub do_headlines {
  # Retrieve any RSS headlines, and distribute them to users.


  foreach my $source (sort(keys(%sources))) {
    do_rss_source($source);
    Stop("Process failed in do_headlines loop") unless defined($Connection->Process(0));
  }
}

sub do_rss_source {
    # find & deliver new headlines for the given source.
    
    my $topic = shift;
    
    my $pid = undef;
    my $content = "";
    my $fork_count = 0;
    
    # Retrieve the RSS
    log3("Getting $topic");
    
    do {
	$pid = open(UA_CHILD, "-|");
	unless (defined $pid) {
	    warn "cannot fork: $!";
	    die "bailing out" if $fork_count++ > 6;
	    sleep 10;
	}
    } until defined $pid;
    
    my $http_code = undef;;
    my $http_message = undef;
    
    if ($pid) {  # parent
      log2("In parent, forked $pid");
      while (<UA_CHILD>) {
	my $line = $_;
	$line =~ s/\n$//;
	if (not defined($http_code)) {
	  $http_code = $line;
	} elsif (not defined($http_message)) {
	  $http_message = $line;
	} else {
	  $content .= "$line\n";
	}
      }
      close(UA_CHILD);
      {} until wait() == -1;

    } else {     # child
      # Retrieve the RSS
      my $ua = LWP::UserAgent->new(timeout => RSS_TIMEOUT);
      my $response = $ua->get($sources{$topic});
      $http_code = $response->code;
      $http_message = $response->message;
      print "$http_code\n";
      print "$http_message\n";
      if (defined($response->content)) {
        $content = $response->content;
	# hack: remove leading whitespace from xml, as XML::RSS doesn't care for it at all.
	$content =~ s/^\s*//g;
	if (($content =~ /.*<\?xml[^>]*encoding[\s]*=[\s]*['"](.*?)["']/) and (uc($1) ne "UTF-8")) {
	  my $enc = uc($1);
	  my $orig_content=$content;
	  my $converter = Text::Iconv->new("$enc", "UTF-8");
	  $content = $converter->convert($orig_content);
          if ($content eq undef) {
                $content=$orig_content;
                $content =~ s/[\x00-\x08\x0b-\x0c\x0e-\x1f\x7f-\xff]/?/g;
          }
	  $content =~ s/(.*<\?xml[^>]*)encoding[\s]*=[\s]*['"].*?["']/$1/;
	}
	$content=utf8($content)->utf8; # to make sure it is UTF-8
	print $content;
      }
      exit;
    }
    
    if (defined ($http_code) and ($http_code >= 300)) {
        log2("Cannot retrieve $topic - skipping: code $http_code");
	set_status($topic, $SOURCE_HTTP_ERROR, 0, "HTTP Error: $0");
        return;
    }

  my $rss = XML::RSS->new();
  open (RSS_FILE, ">rss_temp");
  print RSS_FILE $content;
  close RSS_FILE;

  # Parse the RSS and get the items
  eval { $rss->parsefile("rss_temp")};
  if ($@) {
    log2 ("Malformed XML on source $topic:\n".$@.".\n");
    set_status($topic, $SOURCE_XML_ERROR, 0, $@);
    return;
  }
	
  set_status($topic, $SOURCE_OK, 0);
  my @items = @{$rss->{items}};

  # Discover any new items
  log3("Looking for new items");

  my %temp_items = ();
  log3("after reset");
  # Deterimine & record whether this is a new topic.
  my $new_topic = 0;
  log3("after new init");
  if (exists $cache{$topic}) {
          log3("Not new topic");
  } else {
	  log3("New topic");
          $new_topic = 1;
  }
  log3("Iterating.");

  foreach my $item (@items) {
    my $key = $item->{title};
    $key = $item->{url} unless $key;
    $key = $topic unless $key;
    $temp_items{$key} = 1;
    delete $cache{$topic} unless (ref($cache{$topic}) eq 'HASH');
    if ($new_topic or not exists $cache{$topic}->{$key}) {
	# New item.
	
	log2("New item from $topic - $key");

	# Broadcast the message, IFF this isn't our first encounter with this topic.
	if (not $new_topic) {
	  # Create headline message
	  my $msg = new Net::Jabber::Message();
	  $msg->SetMessage(type   => 'headline', from => ("$topic" . '@' . SERVICE_NAME), subject => $item->{title}, body => $item->{description});
	  $msg->SetBody($item->{title}) unless $msg->GetBody();
	  $msg->SetBody($msg->GetBody() . "\n\n" . $item->{link});
	  my $oob = $msg->NewX('jabber:x:oob');
	  $oob->SetURL($item->{link});
	  $oob->SetDesc($item->{title});

	  deliver_sub($msg, $topic);
	}

	# Remember that we've seen it.
	my $element = $cache{$topic};
	$element->{$key} = 1;
	$cache{$topic} = $element;
    }
  }

  # Forget cached items that have since been removed from their source.
  my $cached_items = $cache{$topic};
  foreach my $key (keys(%$cached_items)) {
    delete $cached_items->{$key} unless defined($temp_items{$key});
    log1("Killing $topic 's cache for $key.") unless defined($temp_items{$key});
  }

  $cache{$topic} = $cached_items;
}


sub deliver_sub {
  # Deliver a jabber message or presence to all subscribers of a given source.
  my @sendees;
  my $item = shift;
  my $topic = lc(shift);
  foreach my $user (keys(%reg)) {
    if (($reg{$user})->{$topic}) {
      $item->SetTo($user);
      push @sendees, $user;
      $Connection->Send($item);
    }
  }

  # HACK: Send an extra blank message, to increase our chances of triggering a stream death if we've done something bad beforehand.
  my $padd = new Net::Jabber::Message;
  $padd->SetTo(SERVICE_NAME);
  $padd->SetFrom(SERVICE_NAME);
  $Connection->Send($padd);

  if (defined($Connection->Process(0))) {
     log2("Notified ".(join(", ", @sendees) || "nobody"));
  } else {
     log1("Failed mid-subscription on $topic. Probabbly XML/UTF8 death. Attempting to reconnect.");
     $Connection->Disconnect();
     $Connection = do_connect();
     log1("Successfully reconnected.");
     init_status();
     set_status($topic, $SOURCE_XML_ERROR, 0, "Jabber stream died mid-connection, probabbly due to a XML or UTF8 error.");
  }
}

sub SigStop {
   my $msg = shift;
   log1("Received signal: $msg, stopping.");
   Stop($msg);
}

sub set_status {
  # Change a source's status, and deliver presence to subscribers as required.
  my $topic = shift;
  my $status = shift;
  my $mode = shift;
  my $extra = shift;
  return if ((not $mode) and ($status eq $status{$topic}));

  my $message = $status_messages{$status};

  ($message = "$message: $extra") if $extra;

  ($status{$topic} = $status) unless $mode;

  log2("Setting $topic 's status to $status (" . $message .  ")");
  my $presence = new Net::Jabber::Presence();
  if ($status == $SOURCE_OFFLINE) {
    $presence->SetType("unavailable");
  } 
  if ($status{$topic} != $SOURCE_OK) {
    $presence->SetShow($status_show{$status});
    $presence->SetStatus($message);
  }
  $presence->SetFrom($topic . '@' . SERVICE_NAME);
  deliver_sub($presence, $topic);
}

sub Stop {
  # Terminate the headline component's current run.
  my $msg = shift;
  log1("Exiting because of $msg ...\n");
  foreach my $topic (keys(%sources)) {
    set_status($topic, $SOURCE_OFFLINE, 1);
  }
  untie %reg;
  untie %cache;
  untie %sources;
  untie %status;
  $Connection->Disconnect();
  exit(0);
}

sub do_connect() {
    my $server = (shift or SERVER);
    my $port = (shift or PORT);
    my $secret = (shift or SECRET);
    my $name = (shift or SERVICE_NAME);

    my $Connection = new Net::Jabber::Component(debuglevel=>0, debugfile=>"stdout");

    my $status = $Connection->Connect("hostname" => $server, "port" => $port, "secret" => $secret, "componentname" => $name);
    if (!(defined($status))) {
      log1( "ERROR:  Jabber server is down or connection was not allowed.");
      log1( "        ($!)\n");
      exit(0);
    }

    $Connection->SetCallBacks("message" => \&InMessage, "presence" => \&InPresence, "iq" => \&InIQ);

    log2("Connected to " . SERVER . "." . PORT . "...\n");

    return $Connection;
}

sub init_status {
    # Bring each of the transports' presence up to their recorded status.
    foreach my $topic (sort(keys(%sources))) {
      set_status($topic, $status{$topic}, 1);
    }
    log2("Done bringing sources up to previous status");
}

sub log1 {
  # WARN
  my $msg = shift;
  return unless VERBOSE >= 1;
  print STDERR "WARN: $msg\n";
}

sub log2 {
  # INFO
  my $msg = shift;
  return unless VERBOSE >= 2;
  print "INFO: $msg\n";
}

sub log3 {
  # DBUG
  my $msg = shift;
  return unless VERBOSE >= 3;
  print "DBUG: $msg\n";
}
