From: rettenbe Date: Tue, 27 May 2008 08:43:10 +0000 (+0000) Subject: * gosa-si-server-nobus X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=e94fd6cf023c53eeeb7c99314f6269d2aeb61a1b;p=gosa.git * gosa-si-server-nobus * under construction git-svn-id: https://oss.gonicus.de/repositories/gosa/trunk@11015 594d385d-05f5-0310-b6e9-bd551577e9d8 --- diff --git a/gosa-si/gosa-si-server-nobus b/gosa-si/gosa-si-server-nobus index 84097f391..fd3a179ca 100755 --- a/gosa-si/gosa-si-server-nobus +++ b/gosa-si/gosa-si-server-nobus @@ -127,6 +127,7 @@ my @incoming_col_names = ("id INTEGER PRIMARY KEY", "targettag DEFAULT 'none'", "xmlmessage DEFAULT 'none'", "module DEFAULT 'none'", + "sessionid DEFAULT '0'", ); # holds all gosa jobs @@ -939,7 +940,7 @@ sub send_msg_to_target { # known_clients $sql_statement = "SELECT * FROM known_clients WHERE hostname='$address'"; $res = $known_clients_db->select_dbentry($sql_statement); - if( keys(%$res) > 0) { + if( keys(%$res) == 1) { $act_status = $res->{1}->{'status'}; if ($act_status eq "down" && $new_status eq "down") { $sql_statement = "DELETE FROM known_clients WHERE hostname='$address'"; @@ -954,6 +955,8 @@ sub send_msg_to_target { daemon_log("$session_id INFO: set '$address' from status '$act_status' to '$new_status'", 5); } } + } else { + daemon_log("$session_id WARNING: no or more hits found for host '$address' in known_server_db", 3); } # known_server @@ -1005,16 +1008,6 @@ sub update_jobdb_status_for_send_msgs { } } -sub _start { - my ($kernel) = $_[KERNEL]; - &trigger_db_loop($kernel); - $global_kernel = $kernel; - $kernel->yield('register_at_foreign_servers'); - $kernel->yield('create_fai_server_db', $fai_server_tn ); - $kernel->yield('create_fai_release_db', $fai_release_tn ); - $kernel->sig(USR1 => "sig_handler"); - $kernel->sig(USR2 => "create_packages_list_db"); -} sub sig_handler { my ($kernel, $signal) = @_[KERNEL, ARG0] ; @@ -1025,7 +1018,7 @@ sub sig_handler { sub msg_to_decrypt { - my ($session, $heap) = @_[SESSION, HEAP]; + my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP]; my $session_id = $session->ID; my ($msg, $msg_hash, $module); my $error = 0; @@ -1073,6 +1066,7 @@ sub msg_to_decrypt { xmlmessage=>$msg, timestamp=>&get_time, module=>$module, + sessionid=>$session_id, } ); if ($res != 0) { # TODO ist das mit $! so ok??? @@ -1084,15 +1078,14 @@ sub msg_to_decrypt { sub next_task { - my ($session, $heap) = @_[SESSION, HEAP]; + my ($session, $heap, $task) = @_[SESSION, HEAP, ARG0]; my $task = POE::Wheel::Run->new( - Program => sub { process_task($session, $heap) }, + Program => sub { process_task($session, $heap, $task) }, StdioFilter => POE::Filter::Reference->new(), StdoutEvent => "task_result", StderrEvent => "task_debug", CloseEvent => "task_done", ); - $heap->{task}->{ $task->ID } = $task; } @@ -1127,54 +1120,24 @@ sub handle_task_done { sub process_task { no strict "refs"; - my ($session, $heap, $input) = @_; - my $session_id = $session->ID; + my ($session, $heap, $task) = @_; my $error = 0; my $answer_l; my ($answer_header, @answer_target_l, $answer_source); my $client_answer = ""; - ################################################## - # fetch first unprocessed message from incoming_db - # sometimes the program is faster than sqlite, so wait until informations are present at db - my $id_sql; - my $id_res; - my $message_id; -# TODO : das hier ist sehr sehr unschön -# to be tested: speed enhancement with usleep 100000??? - while (1) { - $id_sql = "SELECT min(id) FROM $incoming_tn WHERE (NOT headertag LIKE 'answer%')"; - $id_res = $incoming_db->exec_statement($id_sql); - $message_id = @{@$id_res[0]}[0]; - if (defined $message_id) { last } - usleep(100000); - } - - # fetch new message from incoming_db - my $sql = "SELECT * FROM $incoming_tn WHERE id=$message_id"; - my $res = $incoming_db->exec_statement($sql); - # prepare all variables needed to process message - my $msg = @{@$res[0]}[4]; - my $incoming_id = @{@$res[0]}[0]; - my $module = @{@$res[0]}[5]; - my $header = @{@$res[0]}[2]; + my $msg = $task->{'xmlmessage'}; + my $incoming_id = $task->{'id'}; + my $module = $task->{'module'}; + my $header = $task->{'headertag'}; + my $session_id = $task->{'sessionid'}; my $msg_hash = $xml->XMLin($msg, ForceArray=>1); - # messages which are an answer to a still running process should not be processed here - if ($header =~ /^answer_(\d+)/) { - return; - } - - # delete message from db - my $delete_sql = "DELETE FROM $incoming_tn WHERE id=$incoming_id"; - my $delete_res = $incoming_db->exec_statement($delete_sql); - ###################### # process incoming msg if( $error == 0) { - daemon_log("$session_id INFO: Incoming msg (session_id=$session_id) with header '".@{$msg_hash->{'header'}}[0]. - "' from '".$heap->{'remote_ip'}."'", 5); + daemon_log("$session_id INFO: Incoming msg (session_id=$session_id) with header '".@{$msg_hash->{'header'}}[0]."'", 5); daemon_log("$session_id DEBUG: Processing module ".$module, 7); $answer_l = &{ $module."::process_incoming_msg" }($msg, $msg_hash, $session_id); @@ -1332,6 +1295,17 @@ sub process_task { } +sub session_start { + my ($kernel) = $_[KERNEL]; + &trigger_db_loop($kernel); + $global_kernel = $kernel; + $kernel->yield('register_at_foreign_servers'); + $kernel->yield('create_fai_server_db', $fai_server_tn ); + $kernel->yield('create_fai_release_db', $fai_release_tn ); + $kernel->yield('watch_for_next_tasks'); + $kernel->sig(USR1 => "sig_handler"); + $kernel->sig(USR2 => "create_packages_list_db"); +} sub trigger_db_loop { my ($kernel) = @_ ; @@ -1654,6 +1628,29 @@ sub watch_for_old_known_clients { } +sub watch_for_next_tasks { + my ($kernel,$heap) = @_[KERNEL, HEAP]; + + my $sql = "SELECT * FROM $incoming_tn"; + my $res = $incoming_db->select_dbentry($sql); + + while ( my ($hit_num, $hit) = each %$res) { + my $headertag = $hit->{'headertag'}; + if ($headertag =~ /^answer_(\d+)/) { + # do not start processing, this message is for a still running POE::Wheel + next; + } + my $message_id = $hit->{'id'}; + $kernel->yield('next_task', $hit); + + my $sql = "DELETE FROM $incoming_tn WHERE id=$message_id"; + my $res = $incoming_db->exec_statement($sql); + } + + $kernel->delay_set('watch_for_next_tasks', 1); +} + + sub get_ldap_handle { my ($session_id) = @_; my $heap; @@ -2818,17 +2815,17 @@ foreach my $foreign_server (@foreign_server_list) { POE::Component::Server::TCP->new( + Alias => "TCP_SERVER", Port => $server_port, ClientInput => sub { my ($kernel, $input) = @_[KERNEL, ARG0]; push(@tasks, $input); push(@msgs_to_decrypt, $input); $kernel->yield("msg_to_decrypt"); - $kernel->yield("next_task"); }, InlineStates => { - next_task => \&next_task, msg_to_decrypt => \&msg_to_decrypt, + next_task => \&next_task, task_result => \&handle_task_result, task_done => \&handle_task_done, task_debug => \&handle_task_debug, @@ -2841,9 +2838,14 @@ daemon_log("start socket for incoming xml messages at port '$server_port' ", 1); # create session for repeatedly checking the job queue for jobs POE::Session->create( inline_states => { - _start => \&_start, + _start => \&session_start, register_at_foreign_servers => \®ister_at_foreign_servers, - sig_handler => \&sig_handler, + sig_handler => \&sig_handler, + next_task => \&next_task, + task_result => \&handle_task_result, + task_done => \&handle_task_done, + task_debug => \&handle_task_debug, + watch_for_next_tasks => \&watch_for_next_tasks, watch_for_new_messages => \&watch_for_new_messages, watch_for_delivery_messages => \&watch_for_delivery_messages, watch_for_done_messages => \&watch_for_done_messages,