index 84097f391a1ea4f0c4cc6b24b5c0135280c3bb71..7bd0ba2709faed68e4ee3ec4ab50af4cf80f59f6 100755 (executable)
use Net::LDAP;
use Net::LDAP::Util qw(:escape);
use Time::HiRes qw( usleep);
+use DateTime;
my $modules_path = "/usr/lib/gosa-si/modules";
use lib "/usr/lib/gosa-si/modules";
our $prg= basename($0);
our $global_kernel;
-my (%cfg_defaults, $foreground, $verbose, $ping_timeout);
+my ($foreground, $ping_timeout);
my ($bus_activ, $bus, $msg_to_bus, $bus_cipher);
my ($server);
my ($gosa_server, $job_queue_timeout, $job_queue_loop_delay);
my ($messaging_db_loop_delay);
my ($known_modules);
-my ($pid_file, $procid, $pid, $log_file);
-my ($arp_activ, $arp_fifo);
+my ($procid, $pid);
+my ($arp_fifo);
my ($xml);
my $sources_list;
my $max_clients;
"targettag DEFAULT 'none'",
"xmlmessage DEFAULT 'none'",
"module DEFAULT 'none'",
+ "sessionid DEFAULT '0'",
);
# holds all gosa jobs
# known_clients
$sql_statement = "SELECT * FROM known_clients WHERE hostname='$address'";
$res = $known_clients_db->select_dbentry($sql_statement);
- if( keys(%$res) > 0) {
+ if( keys(%$res) == 1) {
$act_status = $res->{1}->{'status'};
if ($act_status eq "down" && $new_status eq "down") {
$sql_statement = "DELETE FROM known_clients WHERE hostname='$address'";
# known_server
$sql_statement = "SELECT * FROM $known_server_tn WHERE hostname='$address'";
$res = $known_server_db->select_dbentry($sql_statement);
- if( keys(%$res) > 0 ) {
+ if( keys(%$res) == 1) {
$act_status = $res->{1}->{'status'};
if ($act_status eq "down" && $new_status eq "down") {
$sql_statement = "DELETE FROM known_server WHERE hostname='$address'";
}
}
-sub _start {
- my ($kernel) = $_[KERNEL];
- &trigger_db_loop($kernel);
- $global_kernel = $kernel;
- $kernel->yield('register_at_foreign_servers');
- $kernel->yield('create_fai_server_db', $fai_server_tn );
- $kernel->yield('create_fai_release_db', $fai_release_tn );
- $kernel->sig(USR1 => "sig_handler");
- $kernel->sig(USR2 => "create_packages_list_db");
-}
sub sig_handler {
my ($kernel, $signal) = @_[KERNEL, ARG0] ;
sub msg_to_decrypt {
- my ($session, $heap) = @_[SESSION, HEAP];
+ my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
my $session_id = $session->ID;
my ($msg, $msg_hash, $module);
my $error = 0;
xmlmessage=>$msg,
timestamp=>&get_time,
module=>$module,
+ sessionid=>$session_id,
} );
if ($res != 0) {
# TODO ist das mit $! so ok???
sub next_task {
- my ($session, $heap) = @_[SESSION, HEAP];
- my $task = POE::Wheel::Run->new(
- Program => sub { process_task($session, $heap) },
+ my ($session, $heap, $task) = @_[SESSION, HEAP, ARG0];
+ my $running_task = POE::Wheel::Run->new(
+ Program => sub { process_task($session, $heap, $task) },
StdioFilter => POE::Filter::Reference->new(),
StdoutEvent => "task_result",
StderrEvent => "task_debug",
CloseEvent => "task_done",
);
-
- $heap->{task}->{ $task->ID } = $task;
+ $heap->{task}->{ $running_task->ID } = $running_task;
}
sub handle_task_result {
sub process_task {
no strict "refs";
- my ($session, $heap, $input) = @_;
- my $session_id = $session->ID;
+ my ($session, $heap, $task) = @_;
my $error = 0;
my $answer_l;
my ($answer_header, @answer_target_l, $answer_source);
my $client_answer = "";
- ##################################################
- # fetch first unprocessed message from incoming_db
- # sometimes the program is faster than sqlite, so wait until informations are present at db
- my $id_sql;
- my $id_res;
- my $message_id;
-# TODO : das hier ist sehr sehr unschön
-# to be tested: speed enhancement with usleep 100000???
- while (1) {
- $id_sql = "SELECT min(id) FROM $incoming_tn WHERE (NOT headertag LIKE 'answer%')";
- $id_res = $incoming_db->exec_statement($id_sql);
- $message_id = @{@$id_res[0]}[0];
- if (defined $message_id) { last }
- usleep(100000);
- }
-
- # fetch new message from incoming_db
- my $sql = "SELECT * FROM $incoming_tn WHERE id=$message_id";
- my $res = $incoming_db->exec_statement($sql);
-
# prepare all variables needed to process message
- my $msg = @{@$res[0]}[4];
- my $incoming_id = @{@$res[0]}[0];
- my $module = @{@$res[0]}[5];
- my $header = @{@$res[0]}[2];
+ my $msg = $task->{'xmlmessage'};
+ my $incoming_id = $task->{'id'};
+ my $module = $task->{'module'};
+ my $header = $task->{'headertag'};
+ my $session_id = $task->{'sessionid'};
my $msg_hash = $xml->XMLin($msg, ForceArray=>1);
-
- # messages which are an answer to a still running process should not be processed here
- if ($header =~ /^answer_(\d+)/) {
- return;
- }
-
- # delete message from db
- my $delete_sql = "DELETE FROM $incoming_tn WHERE id=$incoming_id";
- my $delete_res = $incoming_db->exec_statement($delete_sql);
+ my $source = @{$msg_hash->{'source'}}[0];
+
+ # set timestamp of incoming client uptodate, so client will not
+ # be deleted from known_clients because of expiration
+ my $act_time = &get_time();
+ my $sql = "UPDATE $known_clients_tn SET timestamp='$act_time' WHERE hostname='$source'";
+ my $res = $known_clients_db->exec_statement($sql);
######################
# process incoming msg
if( $error == 0) {
- daemon_log("$session_id INFO: Incoming msg (session_id=$session_id) with header '".@{$msg_hash->{'header'}}[0].
- "' from '".$heap->{'remote_ip'}."'", 5);
+ daemon_log("$session_id INFO: Incoming msg (session_id=$session_id) with header '".@{$msg_hash->{'header'}}[0]."'", 5);
daemon_log("$session_id DEBUG: Processing module ".$module, 7);
$answer_l = &{ $module."::process_incoming_msg" }($msg, $msg_hash, $session_id);
}
+sub session_start {
+ my ($kernel) = $_[KERNEL];
+ &trigger_db_loop($kernel);
+ $global_kernel = $kernel;
+ $kernel->yield('register_at_foreign_servers');
+ $kernel->yield('create_fai_server_db', $fai_server_tn );
+ $kernel->yield('create_fai_release_db', $fai_release_tn );
+ $kernel->yield('watch_for_next_tasks');
+ $kernel->sig(USR1 => "sig_handler");
+ $kernel->sig(USR2 => "create_packages_list_db");
+}
sub trigger_db_loop {
my ($kernel) = @_ ;
my $res = $known_clients_db->select_dbentry( $sql_statement );
my $act_time = int(&get_time());
+
while ( my ($hit_num, $hit) = each %$res) {
- my $expired_timestamp = int($hit->{'timestamp'}) + (2 * int($hit->{'keylifetime'}));
+ my $expired_timestamp = int($hit->{'timestamp'});
+ $expired_timestamp =~ /(\d{4})(\d\d)(\d\d)(\d\d)(\d\d)(\d\d)/;
+ my $dt = DateTime->new( year => $1,
+ month => $2,
+ day => $3,
+ hour => $4,
+ minute => $5,
+ second => $6,
+ );
+
+ $dt->add( seconds => 2 * int($hit->{'keylifetime'}) );
+ $expired_timestamp = $dt->ymd('').$dt->hms('')."\n";
if ($act_time > $expired_timestamp) {
my $hostname = $hit->{'hostname'};
my $del_sql = "DELETE FROM $known_clients_tn WHERE hostname='$hostname'";
my $del_res = $known_clients_db->exec_statement($del_sql);
- &main::daemon_log("0 INFO: timestamp of client '$hostname' is expired, client will be deleted from known_clients_db", 5);
+ &main::daemon_log("0 INFO: timestamp '".$hit->{'timestamp'}."' of client '$hostname' is expired('$expired_timestamp'), client will be deleted from known_clients_db", 5);
}
}
}
+sub watch_for_next_tasks {
+ my ($kernel,$heap) = @_[KERNEL, HEAP];
+
+ my $sql = "SELECT * FROM $incoming_tn";
+ my $res = $incoming_db->select_dbentry($sql);
+
+ while ( my ($hit_num, $hit) = each %$res) {
+ my $headertag = $hit->{'headertag'};
+ if ($headertag =~ /^answer_(\d+)/) {
+ # do not start processing, this message is for a still running POE::Wheel
+ next;
+ }
+ my $message_id = $hit->{'id'};
+ $kernel->yield('next_task', $hit);
+
+ my $sql = "DELETE FROM $incoming_tn WHERE id=$message_id";
+ my $res = $incoming_db->exec_statement($sql);
+ }
+
+ $kernel->delay_set('watch_for_next_tasks', 1);
+}
+
+
sub get_ldap_handle {
my ($session_id) = @_;
my $heap;
POE::Component::Server::TCP->new(
+ Alias => "TCP_SERVER",
Port => $server_port,
ClientInput => sub {
my ($kernel, $input) = @_[KERNEL, ARG0];
push(@tasks, $input);
push(@msgs_to_decrypt, $input);
$kernel->yield("msg_to_decrypt");
- $kernel->yield("next_task");
},
InlineStates => {
- next_task => \&next_task,
msg_to_decrypt => \&msg_to_decrypt,
+ next_task => \&next_task,
task_result => \&handle_task_result,
task_done => \&handle_task_done,
task_debug => \&handle_task_debug,
@@ -2841,9 +2856,14 @@ daemon_log("start socket for incoming xml messages at port '$server_port' ", 1);
# create session for repeatedly checking the job queue for jobs
POE::Session->create(
inline_states => {
- _start => \&_start,
+ _start => \&session_start,
register_at_foreign_servers => \®ister_at_foreign_servers,
- sig_handler => \&sig_handler,
+ sig_handler => \&sig_handler,
+ next_task => \&next_task,
+ task_result => \&handle_task_result,
+ task_done => \&handle_task_done,
+ task_debug => \&handle_task_debug,
+ watch_for_next_tasks => \&watch_for_next_tasks,
watch_for_new_messages => \&watch_for_new_messages,
watch_for_delivery_messages => \&watch_for_delivery_messages,
watch_for_done_messages => \&watch_for_done_messages,