Code

* gosa-si-server-nobus
authorrettenbe <rettenbe@594d385d-05f5-0310-b6e9-bd551577e9d8>
Tue, 27 May 2008 08:43:10 +0000 (08:43 +0000)
committerrettenbe <rettenbe@594d385d-05f5-0310-b6e9-bd551577e9d8>
Tue, 27 May 2008 08:43:10 +0000 (08:43 +0000)
* under construction

git-svn-id: https://oss.gonicus.de/repositories/gosa/trunk@11015 594d385d-05f5-0310-b6e9-bd551577e9d8

gosa-si/gosa-si-server-nobus

index 84097f391a1ea4f0c4cc6b24b5c0135280c3bb71..fd3a179cac83afadd808349a574b2a105ae308b2 100755 (executable)
@@ -127,6 +127,7 @@ my @incoming_col_names = ("id INTEGER PRIMARY KEY",
                "targettag DEFAULT 'none'",
         "xmlmessage DEFAULT 'none'",
         "module DEFAULT 'none'",
+        "sessionid DEFAULT '0'",
         );
 
 # holds all gosa jobs
@@ -939,7 +940,7 @@ sub send_msg_to_target {
     # known_clients
     $sql_statement = "SELECT * FROM known_clients WHERE hostname='$address'";
     $res = $known_clients_db->select_dbentry($sql_statement);
-    if( keys(%$res) > 0) {
+    if( keys(%$res) == 1) {
         $act_status = $res->{1}->{'status'};
         if ($act_status eq "down" && $new_status eq "down") {
             $sql_statement = "DELETE FROM known_clients WHERE hostname='$address'";
@@ -954,6 +955,8 @@ sub send_msg_to_target {
                 daemon_log("$session_id INFO: set '$address' from status '$act_status' to '$new_status'", 5);
             }
         }
+    } else {
+        daemon_log("$session_id WARNING: no or more hits found for host '$address' in known_server_db", 3);
     }
 
     # known_server
@@ -1005,16 +1008,6 @@ sub update_jobdb_status_for_send_msgs {
     }
 }
 
-sub _start {
-    my ($kernel) = $_[KERNEL];
-    &trigger_db_loop($kernel);
-    $global_kernel = $kernel;
-    $kernel->yield('register_at_foreign_servers');
-       $kernel->yield('create_fai_server_db', $fai_server_tn );
-       $kernel->yield('create_fai_release_db', $fai_release_tn );
-       $kernel->sig(USR1 => "sig_handler");
-       $kernel->sig(USR2 => "create_packages_list_db");
-}
 
 sub sig_handler {
        my ($kernel, $signal) = @_[KERNEL, ARG0] ;
@@ -1025,7 +1018,7 @@ sub sig_handler {
 
 
 sub msg_to_decrypt {
-    my ($session, $heap) = @_[SESSION, HEAP];
+    my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
     my $session_id = $session->ID;
     my ($msg, $msg_hash, $module);
     my $error = 0;
@@ -1073,6 +1066,7 @@ sub msg_to_decrypt {
                 xmlmessage=>$msg,
                 timestamp=>&get_time,
                 module=>$module,
+                sessionid=>$session_id,
                 } );
         if ($res != 0) {
                        # TODO ist das mit $! so ok???
@@ -1084,15 +1078,14 @@ sub msg_to_decrypt {
 
 
 sub next_task {
-    my ($session, $heap) = @_[SESSION, HEAP];
+    my ($session, $heap, $task) = @_[SESSION, HEAP, ARG0];
     my $task = POE::Wheel::Run->new(
-            Program => sub { process_task($session, $heap) },
+            Program => sub { process_task($session, $heap, $task) },
             StdioFilter => POE::Filter::Reference->new(),
             StdoutEvent  => "task_result",
             StderrEvent  => "task_debug",
             CloseEvent   => "task_done",
             );
-
     $heap->{task}->{ $task->ID } = $task;
 }
 
@@ -1127,54 +1120,24 @@ sub handle_task_done {
 
 sub process_task {
     no strict "refs";
-    my ($session, $heap, $input) = @_;
-    my $session_id = $session->ID;
+    my ($session, $heap, $task) = @_;
     my $error = 0;
     my $answer_l;
     my ($answer_header, @answer_target_l, $answer_source);
     my $client_answer = "";
 
-       ##################################################
-       # fetch first unprocessed message from incoming_db
-    # sometimes the program is faster than sqlite, so wait until informations are present at db
-    my $id_sql;
-    my $id_res;
-    my $message_id;
-# TODO : das hier ist sehr sehr unschön       
-# to be tested: speed enhancement with usleep 100000???
-    while (1) {
-        $id_sql = "SELECT min(id) FROM $incoming_tn WHERE (NOT headertag LIKE 'answer%')"; 
-        $id_res = $incoming_db->exec_statement($id_sql);
-        $message_id = @{@$id_res[0]}[0];
-        if (defined $message_id) { last }
-        usleep(100000);
-    }
-
-    # fetch new message from incoming_db
-    my $sql = "SELECT * FROM $incoming_tn WHERE id=$message_id"; 
-    my $res = $incoming_db->exec_statement($sql);
-
     # prepare all variables needed to process message
-    my $msg = @{@$res[0]}[4];
-    my $incoming_id = @{@$res[0]}[0];
-    my $module = @{@$res[0]}[5];
-    my $header =  @{@$res[0]}[2];
+    my $msg = $task->{'xmlmessage'};
+    my $incoming_id = $task->{'id'};
+    my $module = $task->{'module'};
+    my $header =  $task->{'headertag'};
+    my $session_id = $task->{'sessionid'};
     my $msg_hash = $xml->XMLin($msg, ForceArray=>1);
 
-    # messages which are an answer to a still running process should not be processed here
-    if ($header =~ /^answer_(\d+)/) {
-        return;
-    }
-   
-    # delete message from db 
-    my $delete_sql = "DELETE FROM $incoming_tn WHERE id=$incoming_id";
-    my $delete_res = $incoming_db->exec_statement($delete_sql);
-
     ######################
     # process incoming msg
     if( $error == 0) {
-        daemon_log("$session_id INFO: Incoming msg (session_id=$session_id) with header '".@{$msg_hash->{'header'}}[0].
-                               "' from '".$heap->{'remote_ip'}."'", 5); 
+        daemon_log("$session_id INFO: Incoming msg (session_id=$session_id) with header '".@{$msg_hash->{'header'}}[0]."'", 5); 
         daemon_log("$session_id DEBUG: Processing module ".$module, 7);
         $answer_l = &{ $module."::process_incoming_msg" }($msg, $msg_hash, $session_id);
 
@@ -1332,6 +1295,17 @@ sub process_task {
 
 }
 
+sub session_start {
+    my ($kernel) = $_[KERNEL];
+    &trigger_db_loop($kernel);
+    $global_kernel = $kernel;
+    $kernel->yield('register_at_foreign_servers');
+       $kernel->yield('create_fai_server_db', $fai_server_tn );
+       $kernel->yield('create_fai_release_db', $fai_release_tn );
+    $kernel->yield('watch_for_next_tasks');
+       $kernel->sig(USR1 => "sig_handler");
+       $kernel->sig(USR2 => "create_packages_list_db");
+}
 
 sub trigger_db_loop {
        my ($kernel) = @_ ;
@@ -1654,6 +1628,29 @@ sub watch_for_old_known_clients {
 }
 
 
+sub watch_for_next_tasks {
+    my ($kernel,$heap) = @_[KERNEL, HEAP];
+    
+    my $sql = "SELECT * FROM $incoming_tn";
+    my $res = $incoming_db->select_dbentry($sql);
+
+    while ( my ($hit_num, $hit) = each %$res) {
+        my $headertag = $hit->{'headertag'};
+        if ($headertag =~ /^answer_(\d+)/) {
+            # do not start processing, this message is for a still running POE::Wheel
+            next;
+        }
+        my $message_id = $hit->{'id'};
+        $kernel->yield('next_task', $hit);
+
+        my $sql = "DELETE FROM $incoming_tn WHERE id=$message_id";
+        my $res = $incoming_db->exec_statement($sql);
+    }
+
+    $kernel->delay_set('watch_for_next_tasks', 1); 
+}
+
+
 sub get_ldap_handle {
        my ($session_id) = @_;
        my $heap;
@@ -2818,17 +2815,17 @@ foreach my $foreign_server (@foreign_server_list) {
 
 
 POE::Component::Server::TCP->new(
+    Alias => "TCP_SERVER",
        Port => $server_port,
        ClientInput => sub {
         my ($kernel, $input) = @_[KERNEL, ARG0];
         push(@tasks, $input);
         push(@msgs_to_decrypt, $input);
         $kernel->yield("msg_to_decrypt");
-        $kernel->yield("next_task");
         },
     InlineStates => {
-        next_task => \&next_task,
         msg_to_decrypt => \&msg_to_decrypt,
+        next_task => \&next_task,
         task_result => \&handle_task_result,
         task_done   => \&handle_task_done,
         task_debug  => \&handle_task_debug,
@@ -2841,9 +2838,14 @@ daemon_log("start socket for incoming xml messages at port '$server_port' ", 1);
 # create session for repeatedly checking the job queue for jobs
 POE::Session->create(
        inline_states => {
-               _start => \&_start,
+               _start => \&session_start,
         register_at_foreign_servers => \&register_at_foreign_servers,
-               sig_handler => \&sig_handler,
+        sig_handler => \&sig_handler,
+        next_task => \&next_task,
+        task_result => \&handle_task_result,
+        task_done   => \&handle_task_done,
+        task_debug  => \&handle_task_debug,
+        watch_for_next_tasks => \&watch_for_next_tasks,
         watch_for_new_messages => \&watch_for_new_messages,
         watch_for_delivery_messages => \&watch_for_delivery_messages,
         watch_for_done_messages => \&watch_for_done_messages,