Skip to content

Commit

Permalink
#164 Added synchronized blocks in strategic places to avoid concurren…
Browse files Browse the repository at this point in the history
…cy issues.
  • Loading branch information
hrosa committed Jun 2, 2016
1 parent 846c70a commit b6126bf
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void execute() {
uri = segments.next().toString();

// Need to manually set terminated to false at this point
// Because object is recycled and reset is always called before this method.
// Because object is recycled and reset() is always called before this method.
this.terminated.set(false);

// start announcement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.net.MalformedURLException;
import java.util.Collection;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -97,7 +96,8 @@ public class PlayCollect extends Signal {
private boolean playerListenerAdded = false;

// Concurrency
private final Semaphore terminateSemaphore;
private final AtomicBoolean terminated;
private final Object LOCK;

public PlayCollect(String name) {
super(name);
Expand Down Expand Up @@ -131,7 +131,8 @@ public PlayCollect(String name) {
this.playerListenerAdded = false;

// Concurrency
this.terminateSemaphore = new Semaphore(1);
this.terminated = new AtomicBoolean(false);
this.LOCK = new Object();
}

@Override
Expand Down Expand Up @@ -185,6 +186,10 @@ public void execute() {
this.numberOfAttempts = 1;
}

// Need to manually set terminated to false at this point
// Because object is recycled and reset() is always called before this method.
this.terminated.set(false);

// if initial prompt has been specified then start with prompt phase
if (options.hasPrompt()) {
if (logger.isInfoEnabled()) {
Expand Down Expand Up @@ -212,50 +217,63 @@ public void execute() {
* @param options requested options.
*/
private void startPromptPhase(Collection<Text> promptList) {
player = (Player) getEndpoint().getResource(MediaType.AUDIO, ComponentType.PLAYER);
try {
// assign listener
if (!playerListenerAdded) {
player.addListener(promptHandler);
playerListenerAdded = true;
synchronized (this.LOCK) {
// Hotfix for concurrency issues
// https://github.com/RestComm/mediaserver/issues/164
if (this.terminated.get()) {
if (logger.isInfoEnabled()) {
logger.info("Skipping prompt phase because PlayCollect has been terminated.");
}
return;
}

promptLength = promptList.size();
prompt = promptList.toArray(prompt);
player.setURL(prompt[0].toString());

// specify URL to play
// player.setURL(options.getPrompt().toString());
player = (Player) getEndpoint().getResource(MediaType.AUDIO, ComponentType.PLAYER);
try {
// assign listener
if (!playerListenerAdded) {
player.addListener(promptHandler);
playerListenerAdded = true;
}

// start playback
player.activate();
} catch (TooManyListenersException e) {
of.fire(this, new Text("Too many listeners"));
logger.error("OPERATION FAILURE", e);
} catch (MalformedURLException e) {
logger.info("Received URL in invalid format , firing of");
of.fire(this, new Text(e.getMessage()));
return;
} catch (ResourceUnavailableException e) {
logger.info("Received URL can not be found , firing of");
of.fire(this, new Text(e.getMessage()));
return;
promptLength = promptList.size();
prompt = promptList.toArray(prompt);
player.setURL(prompt[0].toString());

// specify URL to play
// player.setURL(options.getPrompt().toString());

// start playback
player.activate();
} catch (TooManyListenersException e) {
of.fire(this, new Text("Too many listeners"));
logger.error("OPERATION FAILURE", e);
} catch (MalformedURLException e) {
logger.info("Received URL in invalid format , firing of");
of.fire(this, new Text(e.getMessage()));
return;
} catch (ResourceUnavailableException e) {
logger.info("Received URL can not be found , firing of");
of.fire(this, new Text(e.getMessage()));
return;
}
}
}

/**
* Terminates prompt phase if it was started or do nothing otherwise.
*/
private void terminatePrompt() {
// jump to end of segments
if (promptLength > 0) {
promptIndex = promptLength - 1;
}
if (player != null) {
player.deactivate();
player.removeListener(promptHandler);
playerListenerAdded = false;
player = null;
synchronized (this.LOCK) {
// jump to end of segments
if (promptLength > 0) {
promptIndex = promptLength - 1;
}
if (player != null) {
player.deactivate();
player.removeListener(promptHandler);
playerListenerAdded = false;
player = null;
}
}
}

Expand Down Expand Up @@ -307,25 +325,36 @@ private void flushBuffer() {
}

private void startCollectPhase() {
if (this.firstDigitTimer > 0 || this.maxDuration > 0) {
if (this.firstDigitTimer > 0) {
heartbeat.setTtl((int) (this.firstDigitTimer));
} else {
heartbeat.setTtl(-1);
synchronized (this.LOCK) {
// Hotfix for concurrency issues
// https://github.com/RestComm/mediaserver/issues/164
if (this.terminated.get()) {
if (logger.isInfoEnabled()) {
logger.info("Skipping collect phase because PlayCollect has been terminated.");
}
return;
}

if (this.maxDuration > 0) {
heartbeat.setOverallTtl(this.maxDuration);
} else {
heartbeat.setOverallTtl(-1);
if (this.firstDigitTimer > 0 || this.maxDuration > 0) {
if (this.firstDigitTimer > 0) {
heartbeat.setTtl((int) (this.firstDigitTimer));
} else {
heartbeat.setTtl(-1);
}

if (this.maxDuration > 0) {
heartbeat.setOverallTtl(this.maxDuration);
} else {
heartbeat.setOverallTtl(-1);
}

heartbeat.activate();
getEndpoint().getScheduler().submitHeatbeat(heartbeat);
}

heartbeat.activate();
getEndpoint().getScheduler().submitHeatbeat(heartbeat);
buffer.activate();
buffer.flush();
}

buffer.activate();
buffer.flush();
}

/**
Expand All @@ -348,27 +377,24 @@ private void terminateCollectPhase() {
* Terminates any activity.
*/
private void terminate() {
try {
terminateSemaphore.acquire();
} catch (InterruptedException e) {

}

this.isPromptActive = false;
this.terminatePrompt();
this.terminateCollectPhase();

if (this.heartbeat != null) {
this.heartbeat.disable();
this.heartbeat = null;
}
synchronized (this.LOCK) {
if (!this.terminated.get()) {
this.terminated.set(true);
this.isPromptActive = false;
this.terminatePrompt();
this.terminateCollectPhase();

if (this.heartbeat != null) {
this.heartbeat.disable();
this.heartbeat = null;
}

if (options != null) {
Options.recycle(options);
options = null;
if (options != null) {
Options.recycle(options);
options = null;
}
}
}

terminateSemaphore.release();
}

@Override
Expand Down Expand Up @@ -406,25 +432,36 @@ public void reset() {
}

private void next(long delay) {
segCount++;
promptIndex++;
try {
String url = prompt[promptIndex].toString();
if (logger.isInfoEnabled()) {
logger.info(String.format("(%s) Processing player next with url - %s", getEndpoint().getLocalName(), url));
synchronized (this.LOCK) {
// Hotfix for concurrency issues
// https://github.com/RestComm/mediaserver/issues/164
if (this.terminated.get() || !this.isPromptActive) {
if (logger.isInfoEnabled()) {
logger.info("Skipping prompt phase because PlayCollect has been terminated.");
}
return;
}

segCount++;
promptIndex++;
try {
String url = prompt[promptIndex].toString();
if (logger.isInfoEnabled()) {
logger.info(String.format("(%s) Processing player next with url - %s", getEndpoint().getLocalName(), url));
}
player.setURL(url);
player.setInitialDelay(delay);
// start playback
player.start();
} catch (MalformedURLException e) {
logger.error("Received URL in invalid format , firing of");
of.fire(this, new Text(e.getMessage()));
return;
} catch (ResourceUnavailableException e) {
logger.error("Received URL can not be found , firing of");
of.fire(this, new Text(e.getMessage()));
return;
}
player.setURL(url);
player.setInitialDelay(delay);
// start playback
player.start();
} catch (MalformedURLException e) {
logger.error("Received URL in invalid format , firing of");
of.fire(this, new Text(e.getMessage()));
return;
} catch (ResourceUnavailableException e) {
logger.error("Received URL can not be found , firing of");
of.fire(this, new Text(e.getMessage()));
return;
}
}

Expand Down

0 comments on commit b6126bf

Please sign in to comment.