Skip to content

Commit

Permalink
Merge pull request #104 from yma96/main
Browse files Browse the repository at this point in the history
Reconnect Cassandra client and reinit session when NoHostAvailableException
  • Loading branch information
yma96 authored Oct 24, 2024
2 parents 773ea9f + 0a6808a commit 8c7bbd0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.commonjava.service.promote.tracking;

import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.datastax.driver.mapping.Result;
Expand Down Expand Up @@ -125,7 +126,7 @@ public Optional<PromoteTrackingRecords> getTrackingRecords( String trackingId )
}

BoundStatement bound = preparedTrackingRecordQuery.bind( trackingId );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
Result<DtxPromoteRecord> records = promoteRecordMapper.map(resultSet);

Map<String, PathsPromoteResult> resultMap = new HashMap<>();
Expand Down Expand Up @@ -203,7 +204,7 @@ public void deleteTrackingRecords( String trackingId )

// Delete record(s) by tracking id
BoundStatement bound = preparedTrackingRecordDelete.bind( trackingId );
session.execute( bound );
executeSession( bound );

logger.info("Delete tracking record done, trackingId: {}", trackingId);
}
Expand Down Expand Up @@ -262,9 +263,41 @@ private PathsPromoteResult toPathsPromoteResult(String result)
public void rollbackTrackingRecord(String trackingId, PathsPromoteRequest request, Set<String> completedPaths)
{
BoundStatement bound = preparedTrackingRecordRollback.bind( trackingId, request.getPromotionId() );
session.execute( bound );
executeSession( bound );

// Update query-by-path table to set the rollback flag
updateQueryByPath(trackingId, request, completedPaths, true);
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
client.close();
client.init();
this.init();
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
client.close();
client.init();
this.init();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,15 @@ public Session getSession( String keyspace )
} );
}

private volatile boolean closed;

public void close()
{
if ( !closed && cluster != null )
if ( cluster != null )
{
logger.info( "Close cassandra client" );
sessions.forEach( ( key, value ) -> value.close() );
sessions.clear();
cluster.close();
cluster = null;
closed = true;
}
}

Expand Down

0 comments on commit 8c7bbd0

Please sign in to comment.