From b98dc6e19f92a0e974bf7c6623ebccdf08ce75a8 Mon Sep 17 00:00:00 2001 From: waynieack Date: Sun, 18 Feb 2018 23:24:37 -0600 Subject: [PATCH 1/2] Cleaned up logging, added reconnect, added support for Homi, fixed issue with crash on MH start if mqtt broker is down --- lib/mqtt.pm | 158 +++++++++++++--------------------------------------- 1 file changed, 38 insertions(+), 120 deletions(-) diff --git a/lib/mqtt.pm b/lib/mqtt.pm index 1cbfeab7f..344895c23 100644 --- a/lib/mqtt.pm +++ b/lib/mqtt.pm @@ -158,7 +158,6 @@ Notes: reconnect we need to resubscribe. There is no way to do that now (we'll need to resubscribe all the same socket related subscriptions) @FIXME: We're really not checking for ConnAck or SubAck. - @FIXME: there is no reconnect logic @FIXME: No SSL @FIXME: Lots of error checking needs to be done @FIXME: Use of uninitialized value @@ -197,7 +196,7 @@ my $msg_id = 1; my %MQTT_Data; -$main::Debug{mqtt} = 1; +#$main::Debug{mqtt} = 1; # ------------------------------------------------------------------------------ sub dump() { @@ -206,105 +205,13 @@ sub dump() { # ------------------------------------------------------------------------------ -=item - -Okay, I can see this is going to get complicated and require I do a rewrite of -the subscription handling. When we reconnect we want to also resubscribe. -Currently we can't do that. - -=cut - -sub mqtt_reconnect() { - my ($self) = @_; - - ### - ### Do we need to do a clean up on the existing socket before we reconnect? - ### Will a close do that for us ? - ### - $$self{socket}->close(); - - &main::print_log("*** mqtt $$self{instance} mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if ( $main::Debug{mqtt} || 1 ); - - ### 1) open a socket (host, port and keepalive - my $socket = IO::Socket::INET->new( - PeerAddr => $self->{host} . ':' . $self->{port}, - Timeout => $self->{keep_alive_timer}, - ); - - # Can't use this at this time - # $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance); - - &main::print_log( "*** mqtt $$self{instance} Socket check #1 ($$self{keep_alive_timer}) [ $! ]: " . ( $self->isConnected() ? "Connected" : "Failed" ) ) - if ( $main::Debug{mqtt} ); - return if ( !defined($socket) ); - - $self->{socket} = $socket; - $self->{got_ping_response} = 1; - $self->{next_ping} = $self->{keep_alive_timer}; - - # -------------------------------------------------------------------------- - ### 2) Send MQTT_CONNECT - $self->send_mqtt_msg( - message_type => MQTT_CONNECT, - keep_alive_timer => $self->{keep_alive_timer}, - user_name => $self->{user_name}, - password => $self->{password} - ); - - ### 3) Check for ACK or fail - &main::print_log( "*** mqtt $$self{instance} Socket check #2 ($$self{keep_alive_timer}) [ $! ]: " . ( $self->isConnected() ? "Connected" : "Failed" ) ) - if ( $main::Debug{mqtt} ); - - my $msg = read_mqtt_msg_timeout( $self, $buf ); - if ( !$msg ) { - &main::print_log("XXX mqtt $$self{instance} No ConnAck "); - - #exit 1; - return; - } - - # We should actually get a SubAck but who is checking (yes, I know I should) - &main::print_log( "*** mqtt $$self{instance} Received: " . $msg->string ) - if ( $main::Debug{mqtt} ); - - ### ------------------------------------------------------------------------ - - ### - ### Here is where we need to make the changes to support multiple - ### subscriptions. - ### - - ### 4) Send a subscribe '#' (we'll have many of these, one for each device) - ### I don't know if this is a good idea or not but that's what I intend to do for now - $self->send_mqtt_msg( - message_type => MQTT_SUBSCRIBE, - message_id => $msg_id++, - topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $self->{topic} ] - ); - - ### 5) Check for ACK or fail - ### we really should check for a SubAck and that it's the correct SubAck - $msg = $self->read_mqtt_msg_timeout($buf) - or &main::print_log( "*** mqtt $$self{instance} Received: " . "No SubAck" ); - &main::print_log( "*** mqtt $$self{instance} Sub 1 Received: " . "$$msg{string}" ) - if ( $main::Debug{mqtt} ); - - ### ------------------------------------------------------------------------ - - ### 6) check for data - &main::print_log("*** mqtt $$self{instance} Initializing MQTT re_connection ...") - if ( $main::Debug{mqtt} ); -} - -# ------------------------------------------------------------------------------ - =item =cut sub mqtt_connect() { my ($self) = @_; - &main::print_log("*** mqtt mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if ( $main::Debug{mqtt} || 1 ); + &main::print_log("*** mqtt mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if ( $main::Debug{mqtt} ); ### 1) open a socket (host, port and keepalive my $socket = IO::Socket::INET->new( @@ -315,7 +222,16 @@ sub mqtt_connect() { # Can't use this at this time # $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance); - return if ( !defined($socket) ); + if ( !defined($socket) ) { + if ($$self{recon_timer}->inactive) { + ::print_log("*** mqtt connection for $$self{instance} failed, I will try to reconnect in 20 seconds"); + my $inst = $$self{instance}; + $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() }); + return; + } + } + + $self->{socket} = $socket; $self->{got_ping_response} = 1; @@ -381,7 +297,7 @@ sub mqtt_connect() { sub isConnected { my ($self) = @_; - + unless( defined($$self{socket}) ) { return 0 } return $$self{socket}->connected; } @@ -392,7 +308,7 @@ sub isConnected { sub isNotConnected { my ($self) = @_; - + unless( defined($$self{socket}) ) { return 1 } return !$$self{socket}->connected; } @@ -464,7 +380,7 @@ sub new { @{ $$self{command_stack} } = (); $$self{instance} = $instance; - + $$self{recon_timer} = ::Timer::new(); $$self{host} = "$host" || "127.0.0.1"; $$self{port} = $port || 1883; @@ -500,13 +416,13 @@ sub new { ### ------------------------------------------------------------------------ $self->mqtt_connect(); - &main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n") - unless $self; - return unless $self; + unless ($self) { + &main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n"); + return; + } # Hey what happens when we fail ? #$MQTT_Data{$instance}{self} = $self; - if ( 1 == scalar( keys %MQTT_Data ) ) { # Add hooks on first call only &main::print_log("*** mqtt added MQTT check_for_data ..."); &::MainLoop_pre_add_hook( \&mqtt::check_for_data, 1 ); @@ -546,10 +462,10 @@ sub check_for_data { ### @FIXME: failed connection if ( 'off' ne $self->{state} ) { - # First say something - &main::print_log("*** mqtt $inst failed ($$self{host}/$$self{port}/$$self{topic})"); - - # Then do something (reconnect) + if ($$self{recon_timer}->inactive) { + ::print_log("*** mqtt $inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds"); + $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() }); + } # check the state to see if it's off already @@ -709,12 +625,17 @@ sub read_mqtt_msg { # We get no bytes if there is an error or the socket has closed unless ($bytes) { - &main::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) ); + my $inst = $$self{instance}; + if ($$self{recon_timer}->inactive) { + ::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) ); + ::print_log( "*** mqtt instance $$self{instance} will try to reconnect in 20 seconds"); + $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() }); + } # Not a permanent solution just a way to keep debugging - &main::print_log( "*** mqtt deleting $$self{instance}\n" . Dumper( \$self ) ) - if ( $main::Debug{mqtt} ); - delete( $MQTT_Data{ $$self{instance} } ); + #&main::print_log( "*** mqtt deleting $$self{instance}\n" . Dumper( \$self ) ) + # if ( $main::Debug{mqtt} ); + #delete( $MQTT_Data{ $$self{instance} } ); return; } @@ -770,13 +691,10 @@ sub read_mqtt_msg_timeout { sub set { my ( $self, $msg, $set_by ) = @_; - if ( $main::Debug{mqtt} || 1 ) { + if ( $main::Debug{mqtt} ) { my $xStr = defined($msg) ? "($msg)" : "undefined message"; - $xStr .= defined($set_by) ? ", ($set_by)" : ", undefined set_by"; - $xStr .= - ", Obj: " . defined( $$self{object_name} ) - ? ", $$self{object_name}" - : ", undefined object_name"; # @FIXME: Use of uninitialized value + $xStr .= defined($set_by) ? ", ($set_by)" : ", undefined set_by, Obj: "; + $xStr .= defined($$self{object_name}) ? ", $$self{object_name}" : ", undefined object_name"; # @FIXME: Use of uninitialized value &main::print_log("*** mqtt mqtt set $$self{instance}: [$xStr]"); &main::print_log( @@ -960,11 +878,11 @@ sub parse_data_to_obj { # for my $obj ( @{ $$self{objects} } ) { - if ( "$$obj{topic}" eq "$$msg{topic}" ) { + if ( "$$obj{topic}" eq "$$msg{topic}" || "$$obj{topic}" eq "$$msg{topic}/set" ) { $obj->set( $$msg{message}, $self, ); - } + } else { - #&main::print_log ("***mqtt mqtt obj ($$obj{topic}) vs ($$msg{topic})"); + &main::print_log ("***mqtt mqtt obj ($$obj{topic}) vs ($$msg{topic})") if ( $main::Debug{mqtt} ); } } From eabaea854f9f013f147106537e4820c2b9a31fb8 Mon Sep 17 00:00:00 2001 From: waynieack Date: Sun, 18 Feb 2018 23:38:49 -0600 Subject: [PATCH 2/2] Cleaned some of the documentation --- lib/mqtt.pm | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/mqtt.pm b/lib/mqtt.pm index 344895c23..34999a83e 100644 --- a/lib/mqtt.pm +++ b/lib/mqtt.pm @@ -909,9 +909,11 @@ use Data::Dumper; =over - =item name: the 'friendly' name of the squeezebox in squeezecenter. This parameter is used to link this object to the correct status messages in the CLI interface of squeezecenter + =item name: the name of the object seen in Misterhouse - =item interface: the object that is the CLI interface to assign this player to. + =item interface: the parent (mqtt) object that holds the connection info. + + =item interface: the topic that is used to update the object state and/or control a mqtt device =back @@ -919,11 +921,9 @@ use Data::Dumper; =over - =item amplifier: the object that needs to be enabled and disabled together with the squeezebox - - =item auto_off_time: the time (in minutes) the squeezebox and the optional attached amplifier should be turned off after a playlist has ended + =item retain - =item preheat_time: the time (in seconds) the amplifier should be turned on before a notification is played if the amplifier is off. This enables the amplifier to turn on and enable the speakers before the notification is played. + =item qos =back