Skip to content

Commit

Permalink
Fix issue 48: remove hardcoded level, remap FINE to FINER and INFO to…
Browse files Browse the repository at this point in the history
… FINE (#60)
  • Loading branch information
JamesBirdsall authored and sjkwak committed Feb 24, 2017
1 parent 2a97e12 commit 5477b55
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private Void updateCheckpointSync(Checkpoint checkpoint) throws Exception
{
// Need to fetch the most current lease data so that we can update it correctly.
AzureBlobLease lease = getLeaseSync(checkpoint.getPartitionId());
this.host.logWithHostAndPartition(Level.FINE, checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber());
this.host.logWithHostAndPartition(Level.FINER, checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber());
lease.setOffset(checkpoint.getOffset());
lease.setSequenceNumber(checkpoint.getSequenceNumber());
updateLeaseSync(lease);
Expand All @@ -200,7 +200,7 @@ private Void deleteCheckpointSync(String partitionId) throws Exception
{
// "Delete" a checkpoint by changing the offset to null, so first we need to fetch the most current lease
AzureBlobLease lease = getLeaseSync(partitionId);
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Deleting checkpoint for " + partitionId);
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Deleting checkpoint for " + partitionId);
lease.setOffset(null);
lease.setSequenceNumber(0L);
updateLeaseSync(lease);
Expand Down Expand Up @@ -334,7 +334,7 @@ private AzureBlobLease createLeaseIfNotExistsSync(String partitionId) throws URI
{
CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
returnLease = new AzureBlobLease(partitionId, leaseBlob);
this.host.logWithHostAndPartition(Level.INFO, partitionId,
this.host.logWithHostAndPartition(Level.FINE, partitionId,
"CreateLeaseIfNotExist - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.host.getConsumerGroupName() +
"storageBlobPrefix: " + this.storageBlobPrefix);
uploadLease(returnLease, leaseBlob, AccessCondition.generateIfNoneMatchCondition("*"), UploadActivity.Create);
Expand All @@ -347,7 +347,7 @@ private AzureBlobLease createLeaseIfNotExistsSync(String partitionId) throws URI
(extendedErrorInfo.getErrorCode().compareTo(StorageErrorCodeStrings.LEASE_ID_MISSING) == 0))) // occurs when somebody else already has leased the blob
{
// The blob already exists.
this.host.logWithHostAndPartition(Level.INFO, partitionId, "Lease already exists");
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Lease already exists");
returnLease = getLeaseSync(partitionId);
}
else
Expand All @@ -371,7 +371,7 @@ public Future<Void> deleteLease(Lease lease)

private Void deleteLeaseSync(AzureBlobLease lease) throws StorageException
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Deleting lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Deleting lease");
lease.getBlob().deleteIfExists();
return null;
}
Expand All @@ -384,7 +384,7 @@ public Future<Boolean> acquireLease(Lease lease)

private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Acquiring lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Acquiring lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
Expand All @@ -399,12 +399,12 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
leaseBlob.downloadAttributes();
if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED)
{
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "changeLease");
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "changeLease");
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
}
else
{
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "acquireLease");
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "acquireLease");
newToken = leaseBlob.acquireLease(AzureStorageCheckpointLeaseManager.leaseDurationInSeconds, newLeaseId);
}
lease.setToken(newToken);
Expand Down Expand Up @@ -435,7 +435,7 @@ public Future<Boolean> renewLease(Lease lease)

private Boolean renewLeaseSync(AzureBlobLease lease) throws Exception
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Renewing lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Renewing lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
Expand Down Expand Up @@ -467,7 +467,7 @@ public Future<Boolean> releaseLease(Lease lease)

private Boolean releaseLeaseSync(AzureBlobLease lease) throws Exception
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Releasing lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Releasing lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
Expand Down Expand Up @@ -508,7 +508,7 @@ public Boolean updateLeaseSync(AzureBlobLease lease) throws Exception
return false;
}

this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Updating lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Updating lease");

String token = lease.getToken();
if ((token == null) || (token.length() == 0))
Expand Down Expand Up @@ -590,11 +590,11 @@ else if (lease.getOffset() != null)
private boolean wasLeaseLost(StorageException se, String partitionId)
{
boolean retval = false;
this.host.logWithHostAndPartition(Level.FINE, partitionId, "WAS LEASE LOST?");
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Http " + se.getHttpStatusCode());
this.host.logWithHostAndPartition(Level.FINER, partitionId, "WAS LEASE LOST?");
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getHttpStatusCode());
if (se.getExtendedErrorInformation() != null)
{
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage());
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage());
}
if ((se.getHttpStatusCode() == 409) || // conflict
(se.getHttpStatusCode() == 412)) // precondition failed
Expand All @@ -603,8 +603,8 @@ private boolean wasLeaseLost(StorageException se, String partitionId)
if (extendedErrorInfo != null)
{
String errorCode = extendedErrorInfo.getErrorCode();
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Error code: " + errorCode);
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Error message: " + extendedErrorInfo.getErrorMessage());
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error code: " + errorCode);
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error message: " + extendedErrorInfo.getErrorMessage());
if ((errorCode.compareTo(StorageErrorCodeStrings.LEASE_LOST) == 0) ||
(errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_LEASE_OPERATION) == 0) ||
(errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_BLOB_OPERATION) == 0) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ void specializedStartPump()
private void openClients() throws ServiceBusException, IOException, InterruptedException, ExecutionException
{
// Create new client
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH client");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH client");
this.internalOperationFuture = EventHubClient.createFromConnectionString(this.host.getEventHubConnectionString());
this.eventHubClient = (EventHubClient) this.internalOperationFuture.get();
this.internalOperationFuture = null;

// Create new receiver and set options
Object startAt = this.partitionContext.getInitialOffset();
long epoch = this.lease.getEpoch();
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt);
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt);
if (startAt instanceof String)
{
this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(),
Expand Down Expand Up @@ -130,7 +130,7 @@ else if (startAt instanceof Instant)
this.partitionReceiver.setReceiveTimeout(this.host.getEventProcessorOptions().getReceiveTimeOut());
this.internalOperationFuture = null;

this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "EH client and receiver creation finished");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "EH client and receiver creation finished");
}

private void cleanUpClients() // swallows all exceptions
Expand All @@ -146,14 +146,14 @@ private void cleanUpClients() // swallows all exceptions
this.partitionReceiver.setReceiveHandler(null);
}

this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH receiver");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH receiver");
this.partitionReceiver.close();
this.partitionReceiver = null;
}

if (this.eventHubClient != null)
{
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH client");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH client");
this.eventHubClient.close();
this.eventHubClient = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ public EventProcessorHost(
ILeaseManager leaseManager,
ExecutorService executorService)
{
EventProcessorHost.TRACE_LOGGER.setLevel(Level.SEVERE);

if ((hostName == null) || hostName.isEmpty())
{
throw new IllegalArgumentException("hostName argument must not be null or empty string");
Expand Down Expand Up @@ -376,7 +374,7 @@ public EventProcessorHost(

this.partitionManager = new PartitionManager(this);

logWithHost(Level.INFO, "New EventProcessorHost created");
logWithHost(Level.FINE, "New EventProcessorHost created");
}

/**
Expand Down Expand Up @@ -505,7 +503,7 @@ public Future<?> registerEventProcessorFactory(IEventProcessorFactory<?> factory
}
}

logWithHost(Level.INFO, "Starting event processing");
logWithHost(Level.FINE, "Starting event processing");
this.processorFactory = factory;
this.processorOptions = processorOptions;
return EventProcessorHost.executorService.submit(() -> this.partitionManager.initialize());
Expand All @@ -519,7 +517,7 @@ public Future<?> registerEventProcessorFactory(IEventProcessorFactory<?> factory
*/
public void unregisterEventProcessor() throws InterruptedException, ExecutionException
{
logWithHost(Level.INFO, "Stopping event processing");
logWithHost(Level.FINE, "Stopping event processing");

if (this.partitionManager != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ Object getInitialOffset() throws InterruptedException, ExecutionException
{
// No checkpoint was ever stored. Use the initialOffsetProvider instead.
Function<String, Object> initialOffsetProvider = this.host.getEventProcessorOptions().getInitialOffsetProvider();
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Calling user-provided initial offset provider");
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Calling user-provided initial offset provider");
startAt = initialOffsetProvider.apply(this.partitionId);
if (startAt instanceof String)
{
this.offset = (String)startAt;
this.sequenceNumber = 0; // TODO we use sequenceNumber to check for regression of offset, 0 could be a problem until it gets updated from an event
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Initial offset provided: " + this.offset + "//" + this.sequenceNumber);
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Initial offset provided: " + this.offset + "//" + this.sequenceNumber);
}
else if (startAt instanceof Instant)
{
// can't set offset/sequenceNumber
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Initial timestamp provided: " + (Instant)startAt);
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Initial timestamp provided: " + (Instant)startAt);
}
else
{
Expand All @@ -124,7 +124,7 @@ else if (startAt instanceof Instant)
this.offset = startingCheckpoint.getOffset();
startAt = this.offset;
this.sequenceNumber = startingCheckpoint.getSequenceNumber();
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Retrieved starting offset " + this.offset + "//" + this.sequenceNumber);
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Retrieved starting offset " + this.offset + "//" + this.sequenceNumber);
}

return startAt;
Expand Down Expand Up @@ -169,7 +169,7 @@ public void checkpoint(EventData event) throws IllegalArgumentException, Interru

private void persistCheckpoint(Checkpoint persistThis) throws IllegalArgumentException, InterruptedException, ExecutionException
{
this.host.logWithHostAndPartition(Level.FINE, persistThis.getPartitionId(), "Saving checkpoint: " +
this.host.logWithHostAndPartition(Level.FINER, persistThis.getPartitionId(), "Saving checkpoint: " +
persistThis.getOffset() + "//" + persistThis.getSequenceNumber());

Checkpoint inStoreCheckpoint = this.host.getCheckpointManager().getCheckpoint(persistThis.getPartitionId()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ Iterable<String> getPartitionIds() throws IllegalEntityException
throw new EPHConfigurationException(errorMessage, exception);
}

this.host.logWithHost(Level.INFO, "Eventhub " + this.host.getEventHubPath() + " count of partitions: " + this.partitionIds.size());
this.host.logWithHost(Level.FINE, "Eventhub " + this.host.getEventHubPath() + " count of partitions: " + this.partitionIds.size());
for (String id : this.partitionIds)
{
this.host.logWithHost(Level.FINE, "Found partition with id: " + id);
this.host.logWithHost(Level.FINER, "Found partition with id: " + id);
}
}

Expand Down Expand Up @@ -183,7 +183,7 @@ private Void runAndCleanUp()
try
{
runLoop();
this.host.logWithHost(Level.INFO, "Partition manager main loop exited normally, shutting down");
this.host.logWithHost(Level.FINE, "Partition manager main loop exited normally, shutting down");
}
catch (ExceptionWithAction e)
{
Expand Down Expand Up @@ -213,7 +213,7 @@ private Void runAndCleanUp()
}

// Cleanup
this.host.logWithHost(Level.INFO, "Shutting down all pumps");
this.host.logWithHost(Level.FINE, "Shutting down all pumps");
Iterable<Future<?>> pumpRemovals = this.pump.removeAllPumps(CloseReason.Shutdown);

// All of the shutdown threads have been launched, we can shut down the executor now.
Expand Down Expand Up @@ -244,7 +244,7 @@ private Void runAndCleanUp()
}
}

this.host.logWithHost(Level.INFO, "Partition manager exiting");
this.host.logWithHost(Level.FINE, "Partition manager exiting");

return null;
}
Expand Down Expand Up @@ -400,7 +400,7 @@ else if (possibleLease.getOwner().compareTo(this.host.getHostName()) == 0)
{
if (leaseManager.acquireLease(stealee).get())
{
this.host.logWithHostAndPartition(Level.INFO, stealee.getPartitionId(), "Stole lease");
this.host.logWithHostAndPartition(Level.FINE, stealee.getPartitionId(), "Stole lease");
allLeases.put(stealee.getPartitionId(), stealee);
ourLeasesCount++;
}
Expand All @@ -423,7 +423,7 @@ else if (possibleLease.getOwner().compareTo(this.host.getHostName()) == 0)
for (String partitionId : allLeases.keySet())
{
Lease updatedLease = allLeases.get(partitionId);
this.host.logWithHost(Level.FINE, "Lease on partition " + updatedLease.getPartitionId() + " owned by " + updatedLease.getOwner()); // DEBUG
this.host.logWithHost(Level.FINER, "Lease on partition " + updatedLease.getPartitionId() + " owned by " + updatedLease.getOwner()); // DEBUG
if (updatedLease.getOwner().compareTo(this.host.getHostName()) == 0)
{
this.pump.addPump(partitionId, updatedLease);
Expand Down Expand Up @@ -489,7 +489,7 @@ private Iterable<Lease> whichLeasesToSteal(ArrayList<Lease> stealableLeases, int
if (l.getOwner().compareTo(biggestOwner) == 0)
{
stealTheseLeases.add(l);
this.host.logWithHost(Level.FINE, "Proposed to steal lease for partition " + l.getPartitionId() + " from " + biggestOwner);
this.host.logWithHost(Level.FINER, "Proposed to steal lease for partition " + l.getPartitionId() + " from " + biggestOwner);
break;
}
}
Expand Down Expand Up @@ -529,9 +529,9 @@ private HashMap<String, Integer> countLeasesByOwner(Iterable<Lease> leases)
}
for (String owner : counts.keySet())
{
this.host.log(Level.FINE, "host " + owner + " owns " + counts.get(owner) + " leases");
this.host.log(Level.FINER, "host " + owner + " owns " + counts.get(owner) + " leases");
}
this.host.log(Level.FINE, "total hosts in sorted list: " + counts.size());
this.host.log(Level.FINER, "total hosts in sorted list: " + counts.size());

return counts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void shutdown(CloseReason reason)
}
this.pumpStatus = PartitionPumpStatus.PP_CLOSING;
}
this.host.logWithHostAndPartition(Level.INFO, this.partitionContext, "pump shutdown for reason " + reason.toString());
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "pump shutdown for reason " + reason.toString());

specializedShutdown(reason);

Expand Down Expand Up @@ -157,7 +157,7 @@ protected void onEvents(Iterable<EventData> events)
}
if (last != null)
{
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Updating offset in partition context with end of batch " +
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Updating offset in partition context with end of batch " +
last.getSystemProperties().getOffset() + "//" + last.getSystemProperties().getSequenceNumber());
this.partitionContext.setOffsetAndSequenceNumber(last);
}
Expand Down
Loading

0 comments on commit 5477b55

Please sign in to comment.