package Perlbal::Plugin::Cometd; use strict; use warnings; use Sprocket qw( Service::HTTPD Filter::JSON Perlbal::Service::Connector ); use base 'Sprocket::Service::HTTPD'; use Carp qw( croak ); use constant KEEPALIVE_TIMER => 1; our %objects = (); sub new { my $self = shift->SUPER::new(@_); $objects{$self} = 1; $self->{filter} = Sprocket::Filter::JSON->new(); $self; } sub register { my $service = $_[ 1 ]; my $self = __PACKAGE__->new(); # Danga::Socket->AddTimer( KEEPALIVE_TIMER, sub { $self->time_keepalive(); } ); $service->register_hook( "Sprocket|$self" => start_proxy_request => sub { $self->start_proxy_request(@_); } ); # once? Perlbal::Service::add_role( cometd => sub { Sprocket::Perlbal::Service::Connector->new( @_ ); } ); return 1; } sub unregister { my $service = $_[ 1 ]; foreach my $self (keys %objects) { $service->uregister_hook( "Sprocket|$self" => 'start_proxy_request' ); delete $objects{$self}; } Perlbal::Service::remove_role( 'cometd' ); return 1; } sub load { return 1; } sub unload { %objects = (); warn "unload"; return 1; } sub start_proxy_request { my $self = shift; my Perlbal::ClientProxy $client = shift; my Perlbal::HTTPHeaders $head = $client->{req_headers}; return 0 unless $head; # requires patched Perlbal, backend response headers my Perlbal::HTTPHeaders $hd = $client->{res_headers}; unless ( $hd ) { # XXX no res_headers are available when cometd isn't reproxied warn "You are running an old version of Perlbal, please use check out the lastest build in svn at code.sixapart.com\n"; return 0; } delete $client->{res_headers}; return $self->handle_request( $client, $hd ); } # called from Sprocket::Perlbal::Service::Connector sub handle_event { my $self = shift; my $objs = shift; my $cleanup = 0; warn "handle event"; $objs = [ $objs ] unless ( ref( $objs ) eq 'ARRAY' ); my $time = time(); foreach my $o ( @$objs ) { next unless ( ref( $o ) eq 'HASH' ); next if ( !$o->{channel} ); $o->{timestamp} = $time; # XXX my $ijson = $self->iframe_filter( $o->{channel}, $o ); my $json = objToJson( $o )."\n"; warn "event from backend: $json"; if ( $o->{channel} =~ m~^/meta/clients/(.*)~ ) { if ( exists( $self->{ids}->{ $1 } ) ) { my $l = $self->{ids}->{ $1 }; $l->watch_write(0) if $l->write( $l->{scratch}{tunnel} eq 'iframe' ? $ijson : $json ); $l->{alive_time} = $time; next; } } my $id; my $action; if ( $o->{channel} =~ m~^/meta/(.*)~ ) { my $meta = $1; warn "meta channel: $meta"; next if ( !$o->{clientId} ); if ( $meta eq 'internal/reconnect' && $o->{channels} && ref($o->{channels}) eq 'ARRAY' ) { $action = 'add_chs'; $id = $o->{clientId}; warn "channel update for $id!"; } if ( $meta eq 'subscribe' && $o->{subscription} ) { if ( $o->{successful} ne 'false' ) { # add channel $action = 'add_ch'; } $id = $o->{clientId}; } if ( $meta eq 'unsubscribe' && $o->{subscription} ) { if ( $o->{successful} ne 'false' ) { # remove channel $action = 'rem_ch'; } $id = $o->{clientId}; } if ( $meta eq 'disconnect' ) { $action = 'dis_cli'; $id = $o->{clientId}; } if ( $meta eq 'ping' || $meta eq 'handshake' || $meta eq 'connect' || $meta eq 'reconnect' ) { # XXX is reconnect sent back to client, or is connect used $id = $o->{clientId}; } if ( $id ) { my $l = $self->{ids}->{ $id }; if ( !$l ) { warn "client id: $id not in id list: ".join(',',keys %{$self->{ids}}); next; } unless ( $action eq 'add_chs' ) { warn "out to $id: $json"; $l->watch_write(0) if $l->write( $l->{scratch}{tunnel} eq 'iframe' ? $ijson : $json ); $l->{alive_time} = $time; } if ( $action ) { if ( $action eq 'add_ch' ) { $l->{scratch}{ch}->{ $o->{subscription} } = 1; } elsif ( $action eq 'add_chs' ) { warn "+++++adding channels to client $id"; foreach ( @{$o->{channels}} ) { warn "adding channel $_"; $l->{scratch}{ch}->{ $_ } = 1; } } elsif ( $action eq 'rem_ch' ) { # delete $l->{scratch}{ch}->{ $o->{subscription} }; } elsif ( $action eq 'dis_cli' ) { # XXX is close immediate, or at least set {closed} # we need to send the disconnect in the cleanup warn "closing client"; $l->close(); $cleanup = 1; } } next; } else { warn "unhandled meta channel $o->{channel}"; next; } } warn "at client loop"; LISTENER: foreach my $l ( @{$self->{clients}} ) { #next if ( $client && $l == $client ); my $id = $l->{scratch}{id}; warn "at client: $id"; if ( $l->{closed} ) { warn "this client is closed: $id"; if ( $l->{scratch}{tunnel} eq 'iframe' ) { $self->multiplex_send({ channel => '/meta/disconnect', clientId => $l->{scratch}{id}, data => { channels => [ keys %{$l->{scratch}{ch}} ], }, }); } $cleanup = 1; next; } # loop over client channels and deliver event # TODO more efficient warn "event for channel $o->{channel}"; warn "channels this client $id has: ".join(',',(keys %{$l->{scratch}{ch}})); #if ( exists( $l->{scratch}{ch}{ $o->{channel} } ) ) { if ( $o->{clientId} && $o->{clientId} eq $l->{scratch}{id} ) { warn "NOT sending event back to client\n"; } else { warn "******** delivering event on channel $o->{channel} : $id : $json"; $l->watch_write(0) if $l->write( $l->{scratch}{tunnel} eq 'iframe' ? $ijson : $json ); if ( $l->{scratch}{tunnel} eq 'long-polling' ) { if ( !$l->{scratch}{close_after} ) { $l->{scratch}{close_after} = time() + 1; } next; } } $l->{alive_time} = $time; #} } # end LISTENER } # TODO client channels if ( $cleanup ) { @{$self->{clients}} = map { if ( !$_->{closed} ) { weaken( $_ ); $_; } else { if ( $_->{scratch}{tunnel} && $_->{scratch}{tunnel} eq 'long-polling' ) { # XXX right thing to do? } else { $self->multiplex_send({ channel => '/meta/disconnect', clientId => $_->{scratch}{id}, data => { channels => [ keys %{$_->{scratch}{ch}} ], }, }); } delete $self->{ids}->{ $_->{scratch}{id} } if ( $self->{ids}->{ $_->{scratch}{id} } ); (); } } @{$self->{clients}}; } } sub bcast_event { my ($self, $ch, $obj, $client) = @_; my $cleanup; my $time = time(); my $json = $self->iframe_filter( $obj->{channel} = $ch, $obj ); # TODO client channels foreach ( @{$self->{clients}} ) { next if ( $client && $_ == $client ); if ( $_->{closed} ) { $cleanup = 1; next; } $_->{alive_time} = $time; $_->watch_write(0) if $_->write( $json ); } if ( $cleanup ) { @{$self->{clients}} = map { if ( !$_->{closed} ) { weaken( $_ ); $_; } else { $self->multiplex_send({ channel => '/meta/disconnect', clientId => $_->{scratch}{id}, data => { channels => [ keys %{$_->{scratch}{ch}} ], }, }); delete $self->{ids}->{ $_->{scratch}{id} }; (); } } @{$self->{clients}}; } } sub close_overdue_clients { my $self = shift; my $cleanup; my $time = time(); foreach my $c ( @{$self->{clients}} ) { if ( $c->{closed} ) { $cleanup = 1; next; } if ( my $after = $c->{scratch}{close_after} ) { if ( $after >= $time ) { $c->close(); } } } return if ( !$cleanup ); @{$self->{clients}} = map { if ( !$_->{closed} ) { weaken( $_ ); $_; } else { if ( my $tunnel = $_->{scratch}{tunnel} ) { # XXX if ( $tunnel eq 'iframe' ) { $self->multiplex_send({ channel => '/meta/disconnect', clientId => $_->{scratch}{id}, data => { channels => [ keys %{$_->{scratch}{ch}} ], }, }); } } delete $self->{ids}->{ $_->{scratch}{id} }; (); } } @{$self->{clients}}; } sub iframe_filter { my ($self, $ch, $obj) = @_; $obj->{channel} = $ch; $self->{filter}->freeze($obj); } sub time_keepalive { my $self = shift; Danga::Socket->AddTimer( KEEPALIVE_TIMER, sub { $self->time_keepalive(); } ); # $self->bcast_event( '/meta/ping' => { 'time' => time() } ); $self->close_overdue_clients(); # $self->multiplex_send({ # channel => '/meta/ping', # }); return 5; } sub new_request { shift; my $client = shift; return $client->{res_headers} = Perlbal::HTTPHeaders->new_response(@_); } sub multiplex_send { shift; return &Sprocket::Perlbal::Service::Connector::multiplex_send; } 1; __END__ =pod =head1 NAME Perlbal::Plugin::Cometd - Perlbal plugin for Cometd =head1 SYNOPSIS # perlbal.conf LOAD Cometd LOAD vhosts SERVER max_connections = 10000 CREATE POOL apache POOL apache ADD 127.0.0.1:81 CREATE SERVICE apache_proxy SET role = reverse_proxy SET pool = apache SET persist_backend = on SET backend_persist_cache = 2 SET verify_backend = on SET enable_reproxy = true ENABLE apache_proxy CREATE SERVICE cometd SET role = reverse_proxy SET plugins = Cometd ENABLE cometd CREATE SERVICE web SET listen = 10.0.0.1:80 SET role = selector SET plugins = vhosts SET persist_client = on VHOST *.yoursite.com = apache_proxy VHOST * = apache_proxy ENABLE web =head1 ABSTRACT This plugin allows Perlbal to put clients into a push type connection state. =head1 DESCRIPTION This Plugin works by keeping a conneciton open after an external webserver has authorized the client. That way, your valuable http processes can continue serving other clients. The easiest way to use this module is to setup apache on a port other than 80, and Perlbal on port 80 as in the synopsis. Start perbal and use a supported javascript library. You can find supported libraries and more info at L =head1 EXPORT Nothing. =head1 SEE ALSO L, L, L =head1 AUTHOR David Davis Exantus@cometd.comE =head1 COPYRIGHT AND LICENSE Copyright 2006-2007 by David Davis See the LICENSE file =cut