From: rettenbe Date: Thu, 10 Jul 2008 14:54:38 +0000 (+0000) Subject: progress display of jobs in job_queue from foreign si-server too X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=2a118c6e1ce0d181a1083ca47bb3eae46fbdfc62;p=gosa.git progress display of jobs in job_queue from foreign si-server too git-svn-id: https://oss.gonicus.de/repositories/gosa/trunk@11596 594d385d-05f5-0310-b6e9-bd551577e9d8 --- diff --git a/gosa-si/gosa-si-server b/gosa-si/gosa-si-server index c9cbfa3ad..b6d603042 100755 --- a/gosa-si/gosa-si-server +++ b/gosa-si/gosa-si-server @@ -86,7 +86,7 @@ our (%cfg_defaults, $log_file, $pid_file, $arp_activ, $gosa_unit_tag, $GosaPackages_key, $gosa_ip, $gosa_port, $gosa_timeout, $foreign_server_string, $server_domain, $ServerPackages_key, $foreign_servers_register_delay, - $wake_on_lan_passwd, + $wake_on_lan_passwd, $job_synchronization, $modified_jobs_loop_delay, ); # additional variable which should be globaly accessable @@ -146,6 +146,8 @@ my @job_queue_col_names = ("id INTEGER PRIMARY KEY", "xmlmessage DEFAULT 'none'", "macaddress DEFAULT 'none'", "plainname DEFAULT 'none'", + "siserver DEFAULT 'none'", + "modified DEFAULT '0'", ); # holds all other gosa-si-server @@ -253,6 +255,8 @@ my $max_children = 2; "domain" => [\$server_domain, ""], "key" => [\$ServerPackages_key, "none"], "key-lifetime" => [\$foreign_servers_register_delay, 120], + "job-synchronization" => [\$job_synchronization, "true"], + "synchronization-loop" => [\$modified_jobs_loop_delay, 5], } ); @@ -1132,7 +1136,7 @@ sub msg_to_decrypt { if ($done) { # if a job or a gosa message comes from a foreign server, fake module to GosaPackages # so gosa-si-server knows how to process this kind of messages - if ($header =~ /^gosa_/ || $header =~ /job_/) { + if ($header =~ /^gosa_/ || $header =~ /^job_/) { $module = "GosaPackages"; } @@ -1466,6 +1470,7 @@ sub session_start { $kernel->sig(USR2 => "recreate_packages_db"); $kernel->delay_set('watch_for_new_jobs', $job_queue_loop_delay); $kernel->delay_set('watch_for_done_jobs', $job_queue_loop_delay); + $kernel->delay_set('watch_for_modified_jobs', $modified_jobs_loop_delay); $kernel->delay_set('watch_for_new_messages', $messaging_db_loop_delay); $kernel->delay_set('watch_for_delivery_messages', $messaging_db_loop_delay); $kernel->delay_set('watch_for_done_messages', $messaging_db_loop_delay); @@ -1490,12 +1495,50 @@ sub watch_for_done_jobs { } +# if a job got an update or was modified anyway, send to all other si-server an update message +# of this jobs +sub watch_for_modified_jobs { + my ($kernel,$heap) = @_[KERNEL, HEAP]; + + my $sql_statement = "SELECT * FROM $job_queue_tn WHERE ((siserver='localhost') AND (modified='1'))"; + my $res = $job_db->select_dbentry( $sql_statement ); + + # if db contains no jobs which should be update, do nothing + if (keys %$res != 0) { + + # make out of the db result a gosa-si message + my $update_msg = &db_res2si_msg ($res, "foreign_job_updates", "KNOWN_SERVER", "MY_LOCAL_ADDRESS"); + + # determine all other si-server a foreign_job_updates message should be send + my $sql_statement= "SELECT * FROM $known_server_tn"; + my $res = $known_server_db->select_dbentry( $sql_statement ); + while( my ($hit_num, $hit) = each %$res ) { + my $act_update_msg = $update_msg; + my $act_target_address = $hit->{hostname}; + my $act_target_key = $hit->{hostkey}; + my ($act_target_ip, $act_target_port) = split(/:/, $act_target_address); + my $act_source_address = &get_local_ip_for_remote_ip($act_target_ip).":$act_target_port"; + + $act_update_msg =~ s/KNOWN_SERVER<\/target>/$act_target_address<\/target>/g; + $act_update_msg =~ s/MY_LOCAL_ADDRESS<\/source>/$act_source_address<\/source>/g; + &send_msg_to_target($act_update_msg, $act_target_address, $act_target_key, "foreign_job_updates" , "J"); + } + + # set jobs all jobs to modified = 0, wait until the next modification for updates of other si-server + $sql_statement = "UPDATE $job_queue_tn SET modified='0' "; + $res = $job_db->update_dbentry($sql_statement); + } + + $kernel->delay_set('watch_for_modified_jobs', $modified_jobs_loop_delay); +} + + sub watch_for_new_jobs { if($watch_for_new_jobs_in_progress == 0) { $watch_for_new_jobs_in_progress = 1; my ($kernel,$heap) = @_[KERNEL, HEAP]; - # check gosa job queue for jobs with executable timestamp + # check gosa job quaeue for jobs with executable timestamp my $timestamp = &get_time(); my $sql_statement = "SELECT * FROM $job_queue_tn WHERE status='waiting' AND (CAST (timestamp AS INTEGER)) < $timestamp ORDER BY timestamp"; my $res = $job_db->exec_statement( $sql_statement ); @@ -1586,6 +1629,7 @@ sub watch_for_new_jobs { } + sub watch_for_new_messages { my ($kernel,$heap) = @_[KERNEL, HEAP]; my @coll_user_msg; # collection list of outgoing messages @@ -3036,6 +3080,7 @@ POE::Session->create( watch_for_delivery_messages => \&watch_for_delivery_messages, watch_for_done_messages => \&watch_for_done_messages, watch_for_new_jobs => \&watch_for_new_jobs, + watch_for_modified_jobs => \&watch_for_modified_jobs, watch_for_done_jobs => \&watch_for_done_jobs, watch_for_old_known_clients => \&watch_for_old_known_clients, create_packages_list_db => \&run_create_packages_list_db, diff --git a/gosa-si/modules/GosaPackages.pm b/gosa-si/modules/GosaPackages.pm index 5587dd218..686846d85 100644 --- a/gosa-si/modules/GosaPackages.pm +++ b/gosa-si/modules/GosaPackages.pm @@ -53,8 +53,6 @@ our $server_address = "$server_ip:$server_port"; if( inet_aton($gosa_ip) ){ $gosa_ip = inet_ntoa(inet_aton($gosa_ip)); } $main::gosa_address = "$gosa_ip:$gosa_port"; -# create general settings for this module -#y $gosa_cipher = &create_ciphering($gosa_passwd); my $xml = new XML::Simple(); # import local events @@ -105,63 +103,6 @@ sub read_configfile { } } -# moved to GosaSupportDaemon: 03-06-2008: rettenbe -#=== FUNCTION ================================================================ -# NAME: get_interface_for_ip -# PARAMETERS: ip address (i.e. 192.168.0.1) -# RETURNS: array: list of interfaces if ip=0.0.0.0, matching interface if found, undef else -# DESCRIPTION: Uses proc fs (/proc/net/dev) to get list of interfaces. -#=============================================================================== -#sub get_interface_for_ip { -# my $result; -# my $ip= shift; -# if ($ip && length($ip) > 0) { -# my @ifs= &get_interfaces(); -# if($ip eq "0.0.0.0") { -# $result = "all"; -# } else { -# foreach (@ifs) { -# my $if=$_; -# if(get_ip($if) eq $ip) { -# $result = $if; -# } -# } -# } -# } -# return $result; -#} - -# moved to GosaSupportDaemon: 03-06-2008: rettenbe -#=== FUNCTION ================================================================ -# NAME: get_interfaces -# PARAMETERS: none -# RETURNS: (list of interfaces) -# DESCRIPTION: Uses proc fs (/proc/net/dev) to get list of interfaces. -#=============================================================================== -#sub get_interfaces { -# my @result; -# my $PROC_NET_DEV= ('/proc/net/dev'); -# -# open(PROC_NET_DEV, "<$PROC_NET_DEV") -# or die "Could not open $PROC_NET_DEV"; -# -# my @ifs = ; -# -# close(PROC_NET_DEV); -# -# # Eat first two line -# shift @ifs; -# shift @ifs; -# -# chomp @ifs; -# foreach my $line(@ifs) { -# my $if= (split /:/, $line)[0]; -# $if =~ s/^\s+//; -# push @result, $if; -# } -# -# return @result; -#} #=== FUNCTION ================================================================ # NAME: get_mac @@ -200,35 +141,6 @@ sub get_mac { return $result; } -# moved to GosaSupportDaemon: 03-06-2008: rettenbe -#=== FUNCTION ================================================================ -# NAME: get_ip -# PARAMETERS: interface name (i.e. eth0) -# RETURNS: (ip address) -# DESCRIPTION: Uses ioctl to get ip address directly from system. -#=============================================================================== -#sub get_ip { -# my $ifreq= shift; -# my $result= ""; -# my $SIOCGIFADDR= 0x8915; # man 2 ioctl_list -# my $proto= getprotobyname('ip'); -# -# socket SOCKET, PF_INET, SOCK_DGRAM, $proto -# or die "socket: $!"; -# -# if(ioctl SOCKET, $SIOCGIFADDR, $ifreq) { -# my ($if, $sin) = unpack 'a16 a16', $ifreq; -# my ($port, $addr) = sockaddr_in $sin; -# my $ip = inet_ntoa $addr; -# -# if ($ip && length($ip) > 0) { -# $result = $ip; -# } -# } -# -# return $result; -#} - #=== FUNCTION ================================================================ # NAME: process_incoming_msg @@ -399,6 +311,8 @@ sub process_job_msg { xmlmessage=>$msg, macaddress=>$macaddress, plainname=>$plain_name, + siserver=>"localhost", + modified=>"0", }; my $res = $main::job_db->add_dbentry($func_dic); if (not $res == 0) { diff --git a/gosa-si/modules/ServerPackages.pm b/gosa-si/modules/ServerPackages.pm index 45492912e..8d165d923 100644 --- a/gosa-si/modules/ServerPackages.pm +++ b/gosa-si/modules/ServerPackages.pm @@ -60,7 +60,7 @@ sub process_incoming_msg { # if message is being forwarded from another server, strip of header prefixes - $header =~ s/^gosa_|job_//; + $header =~ s/^gosa_|^job_//; $msg =~ s/
gosa_(\w+)<\/header>|
job_(\w+)<\/header>/
$1<\/header>/; diff --git a/gosa-si/server/events/clMessages.pm b/gosa-si/server/events/clMessages.pm index ad665646a..5ca859014 100644 --- a/gosa-si/server/events/clMessages.pm +++ b/gosa-si/server/events/clMessages.pm @@ -225,7 +225,7 @@ sub CURRENTLY_LOGGED_IN { sub GOTOACTIVATION { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # test whether content is an empty hash or a string which is required @@ -236,7 +236,7 @@ sub GOTOACTIVATION { $header =~ s/CLMSG_//g; my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET status='processing', progress='goto-activation' ". + "SET status='processing', progress='goto-activation', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); @@ -248,7 +248,7 @@ sub GOTOACTIVATION { sub PROGRESS { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # test whether content is an empty hash or a string which is required @@ -259,7 +259,7 @@ sub PROGRESS { $header =~ s/CLMSG_//g; my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET progress='$content' ". + "SET progress='$content', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); @@ -272,7 +272,7 @@ sub PROGRESS { sub FAIREBOOT { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # test whether content is an empty hash or a string which is required @@ -283,7 +283,7 @@ sub FAIREBOOT { $header =~ s/CLMSG_//g; my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET status='processing', result='$header "."$content' ". + "SET status='processing', result='$header "."$content', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); @@ -296,7 +296,7 @@ sub FAIREBOOT { sub TASKSKIP { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # test whether content is an empty hash or a string which is required @@ -307,7 +307,7 @@ sub TASKSKIP { $header =~ s/CLMSG_//g; my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET status='processing', result='$header "."$content' ". + "SET status='processing', result='$header "."$content', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); @@ -320,7 +320,8 @@ sub TASKSKIP { sub TASKBEGIN { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; + my $target = @{$msg_hash->{'target'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # test whether content is an empty hash or a string which is required @@ -336,7 +337,7 @@ sub TASKBEGIN { || ($content eq 'savelog') ) { my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET status='done', result='$header "."$content' ". + "SET status='done', result='$header "."$content', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); @@ -363,7 +364,9 @@ sub TASKBEGIN { # there is exactly one job entry in queue for this host if (keys(%$res) == 1) { &main::daemon_log("$session_id DEBUG: there is already one processing job in queue for host '$macaddress', run an update for this entry", 7); - my $sql_statement = "UPDATE $main::job_queue_tn SET result='$header $content' WHERE status='processing' AND macaddress LIKE '$macaddress'"; + my $sql_statement = "UPDATE $main::job_queue_tn ". + "SET result='$header $content', modified='1' ". + "WHERE status='processing' AND macaddress LIKE '$macaddress'"; my $err = $main::job_db->update_dbentry($sql_statement); if (not defined $err) { &main::daemon_log("$session_id ERROR: cannot update job_db entry: ".Dumper($err), 1); @@ -416,10 +419,12 @@ sub TASKBEGIN { result=>"$header $content", progress=>'none', headertag=>'trigger_action_reinstall', - targettag=>$source, + targettag=>$target, xmlmessage=>'none', macaddress=>$macaddress, plainname=>$plain_name, + modified=>'1', + siserver=>$source, }; my ($err, $error_str) = $main::job_db->add_dbentry($func_dic); if ($err != 0) { @@ -442,7 +447,8 @@ sub TASKBEGIN { sub TASKEND { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $target = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # test whether content is an empty hash or a string which is required @@ -453,7 +459,7 @@ sub TASKEND { $header =~ s/CLMSG_//g; if ($content eq "savelog 0") { - &main::daemon_log("$session_id DEBUG: got savelog from host '$source' - jub done", 7); + &main::daemon_log("$session_id DEBUG: got savelog from host '$target' - job done", 7); my $sql_statement = "DELETE FROM $main::job_queue_tn WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->del_dbentry($sql_statement); @@ -461,7 +467,7 @@ sub TASKEND { } else { my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET status='processing', result='$header "."$content' ". + "SET status='processing', result='$header "."$content', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); @@ -481,7 +487,7 @@ sub TASKEND { sub TASKERROR { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # clean up header @@ -495,7 +501,7 @@ sub TASKERROR { &main::change_fai_state('error', \@{$msg_hash->{'macaddress'}}, $session_id); my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET status='processing', result='$header "."$content' ". + "SET status='processing', result='$header "."$content', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); @@ -514,7 +520,7 @@ sub TASKERROR { sub HOOK { my ($msg, $msg_hash, $session_id) = @_; my $header = @{$msg_hash->{'header'}}[0]; - my $source = @{$msg_hash->{'target'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; my $macaddress = @{$msg_hash->{'macaddress'}}[0]; # clean up header @@ -525,7 +531,7 @@ sub HOOK { if(not ref($content) eq "STRING") { $content = ""; } my $sql_statement = "UPDATE $main::job_queue_tn ". - "SET status='processing', result='$header "."$content' ". + "SET status='processing', result='$header "."$content', modified='1' ". "WHERE status='processing' AND macaddress LIKE '$macaddress'"; &main::daemon_log("$session_id DEBUG: $sql_statement", 7); my $res = $main::job_db->update_dbentry($sql_statement); diff --git a/gosa-si/server/events/server_server_com.pm b/gosa-si/server/events/server_server_com.pm index e980dbad8..686d9f768 100644 --- a/gosa-si/server/events/server_server_com.pm +++ b/gosa-si/server/events/server_server_com.pm @@ -6,6 +6,7 @@ my @events = ( 'confirm_new_server', 'new_foreign_client', 'trigger_wake', + 'foreign_job_updates', ); @EXPORT = @events; @@ -28,6 +29,48 @@ sub get_events { } +sub foreign_job_updates { + my ($msg, $msg_hash, $session_id) = @_ ; + my $header = @{$msg_hash->{'header'}}[0]; + my $source = @{$msg_hash->{'source'}}[0]; + my $target = @{$msg_hash->{'target'}}[0]; + + my @act_keys = keys %$msg_hash; + my @jobs; + foreach my $key (@act_keys) { + if ($key =~ /answer\d+/ ) { push(@jobs, $key); } + } + + foreach my $foreign_job (@jobs) { + + # add job to job queue + my $func_dic = {table=>$main::job_queue_tn, + primkey=>['macaddress', 'headertag'], + timestamp=>@{@{$msg_hash->{$foreign_job}}[0]->{'timestamp'}}[0], + status=>@{@{$msg_hash->{$foreign_job}}[0]->{'status'}}[0], + result=>@{@{$msg_hash->{$foreign_job}}[0]->{'result'}}[0], + progress=>@{@{$msg_hash->{$foreign_job}}[0]->{'progress'}}[0], + headertag=>@{@{$msg_hash->{$foreign_job}}[0]->{'headertag'}}[0], + targettag=>@{@{$msg_hash->{$foreign_job}}[0]->{'targettag'}}[0], + xmlmessage=>@{@{$msg_hash->{$foreign_job}}[0]->{'xmlmessage'}}[0], + macaddress=>@{@{$msg_hash->{$foreign_job}}[0]->{'macaddress'}}[0], + plainname=>@{@{$msg_hash->{$foreign_job}}[0]->{'plainname'}}[0], + siserver=>$source, + modified=>"0", + }; + my $res = $main::job_db->add_dbentry($func_dic); + if (not $res == 0) { + &main::daemon_log("$session_id ERROR: ServerPackages: process_job_msg: $res", 1); + } else { + &main::daemon_log("$session_id INFO: ServerPackages: $header, job '".@{@{$msg_hash->{$foreign_job}}[0]->{'headertag'}}[0]. + "' successfully added to job queue", 5); + } + } + + return; +} + + sub new_server { my ($msg, $msg_hash, $session_id) = @_ ; my $header = @{$msg_hash->{'header'}}[0]; diff --git a/gosa-si/tests/client.php b/gosa-si/tests/client.php index 41051e16e..7620f5885 100755 --- a/gosa-si/tests/client.php +++ b/gosa-si/tests/client.php @@ -20,8 +20,8 @@ for($count = 1; $count <= $zahl; $count++) # jobdb add #$data = "
gosa_network_completition
GOSAGOSAws-muc-2
"; #$data = "
job_sayHello
10.89.1.155:2008300:01:6c:9d:b9:fa00:1B:77:04:8A:6C 20130102133908
"; - #$data = "
job_ping
GOSA 00:01:6c:9d:b9:fa 00:01:6c:9d:b9:fa19700101000000
"; - + $data = "
job_ping2
GOSA 00:01:6c:9d:b9:fa 00:01:6c:9d:b9:fa20130101000000
"; + # jobdb delete #$data = "
gosa_delete_jobdb_entry
GOSA GOSA 3
"; @@ -33,7 +33,7 @@ for($count = 1; $count <= $zahl; $count++) # jobdb update #$data = "
gosa_update_status_jobdb_entry
GOSA GOSA 1 19700101000000
"; - #$data = "
gosa_update_status_jobdb_entry
GOSAGOSA 00:01:6c:9d:b9:fa processing update
"; + $data = "
gosa_update_status_jobdb_entry
GOSAGOSA 00:01:6c:9d:b9:fa processing update
"; # jobdb query #$data = "
gosa_query_jobdb
GOSA GOSAandgt0le5
"; @@ -109,10 +109,10 @@ for($count = 1; $count <= $zahl; $count++) #$data = "
gosa_query_fai_server
GOSA 10.89.1.131:20081
"; #$data = "
gosa_ping
00:01:6c:9d:aa:16 GOSA
"; #$data = "
gosa_ping
00:01:6c:9d:b9:fb GOSA
"; - $data = "
gosa_get_dak_keyring
GOSA GOSA
"; + #$data = "
gosa_get_dak_keyring
GOSA GOSA
"; #$data = "
job_ping
GOSA 00:0c:29:02:e5:4d 00:0c:29:02:e5:4d29700101000000
"; - $data = "
gosa_network_completition
GOSA GOSA localhost
"; + #$data = "
gosa_network_completition
GOSA GOSA localhost
"; $sock->write($data); $answer = "nothing";