From b6126bf60571d6a81f2e25995fc2c0ae2ff2e2c1 Mon Sep 17 00:00:00 2001 From: hrosa Date: Thu, 2 Jun 2016 16:16:16 +0100 Subject: [PATCH] #164 Added synchronized blocks in strategic places to avoid concurrency issues. --- .../media/server/mgcp/pkg/au/Play.java | 2 +- .../media/server/mgcp/pkg/au/PlayCollect.java | 213 ++++++++++-------- 2 files changed, 126 insertions(+), 89 deletions(-) diff --git a/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/Play.java b/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/Play.java index d5a2f7950..84dd2373d 100644 --- a/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/Play.java +++ b/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/Play.java @@ -124,7 +124,7 @@ public void execute() { uri = segments.next().toString(); // Need to manually set terminated to false at this point - // Because object is recycled and reset is always called before this method. + // Because object is recycled and reset() is always called before this method. this.terminated.set(false); // start announcement diff --git a/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/PlayCollect.java b/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/PlayCollect.java index 8ca257582..50d61945c 100644 --- a/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/PlayCollect.java +++ b/controls/mgcp/src/main/java/org/mobicents/media/server/mgcp/pkg/au/PlayCollect.java @@ -24,7 +24,6 @@ import java.net.MalformedURLException; import java.util.Collection; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -97,7 +96,8 @@ public class PlayCollect extends Signal { private boolean playerListenerAdded = false; // Concurrency - private final Semaphore terminateSemaphore; + private final AtomicBoolean terminated; + private final Object LOCK; public PlayCollect(String name) { super(name); @@ -131,7 +131,8 @@ public PlayCollect(String name) { this.playerListenerAdded = false; // Concurrency - this.terminateSemaphore = new Semaphore(1); + this.terminated = new AtomicBoolean(false); + this.LOCK = new Object(); } @Override @@ -185,6 +186,10 @@ public void execute() { this.numberOfAttempts = 1; } + // Need to manually set terminated to false at this point + // Because object is recycled and reset() is always called before this method. + this.terminated.set(false); + // if initial prompt has been specified then start with prompt phase if (options.hasPrompt()) { if (logger.isInfoEnabled()) { @@ -212,34 +217,45 @@ public void execute() { * @param options requested options. */ private void startPromptPhase(Collection promptList) { - player = (Player) getEndpoint().getResource(MediaType.AUDIO, ComponentType.PLAYER); - try { - // assign listener - if (!playerListenerAdded) { - player.addListener(promptHandler); - playerListenerAdded = true; + synchronized (this.LOCK) { + // Hotfix for concurrency issues + // https://github.com/RestComm/mediaserver/issues/164 + if (this.terminated.get()) { + if (logger.isInfoEnabled()) { + logger.info("Skipping prompt phase because PlayCollect has been terminated."); + } + return; } - promptLength = promptList.size(); - prompt = promptList.toArray(prompt); - player.setURL(prompt[0].toString()); - - // specify URL to play - // player.setURL(options.getPrompt().toString()); + player = (Player) getEndpoint().getResource(MediaType.AUDIO, ComponentType.PLAYER); + try { + // assign listener + if (!playerListenerAdded) { + player.addListener(promptHandler); + playerListenerAdded = true; + } - // start playback - player.activate(); - } catch (TooManyListenersException e) { - of.fire(this, new Text("Too many listeners")); - logger.error("OPERATION FAILURE", e); - } catch (MalformedURLException e) { - logger.info("Received URL in invalid format , firing of"); - of.fire(this, new Text(e.getMessage())); - return; - } catch (ResourceUnavailableException e) { - logger.info("Received URL can not be found , firing of"); - of.fire(this, new Text(e.getMessage())); - return; + promptLength = promptList.size(); + prompt = promptList.toArray(prompt); + player.setURL(prompt[0].toString()); + + // specify URL to play + // player.setURL(options.getPrompt().toString()); + + // start playback + player.activate(); + } catch (TooManyListenersException e) { + of.fire(this, new Text("Too many listeners")); + logger.error("OPERATION FAILURE", e); + } catch (MalformedURLException e) { + logger.info("Received URL in invalid format , firing of"); + of.fire(this, new Text(e.getMessage())); + return; + } catch (ResourceUnavailableException e) { + logger.info("Received URL can not be found , firing of"); + of.fire(this, new Text(e.getMessage())); + return; + } } } @@ -247,15 +263,17 @@ private void startPromptPhase(Collection promptList) { * Terminates prompt phase if it was started or do nothing otherwise. */ private void terminatePrompt() { - // jump to end of segments - if (promptLength > 0) { - promptIndex = promptLength - 1; - } - if (player != null) { - player.deactivate(); - player.removeListener(promptHandler); - playerListenerAdded = false; - player = null; + synchronized (this.LOCK) { + // jump to end of segments + if (promptLength > 0) { + promptIndex = promptLength - 1; + } + if (player != null) { + player.deactivate(); + player.removeListener(promptHandler); + playerListenerAdded = false; + player = null; + } } } @@ -307,25 +325,36 @@ private void flushBuffer() { } private void startCollectPhase() { - if (this.firstDigitTimer > 0 || this.maxDuration > 0) { - if (this.firstDigitTimer > 0) { - heartbeat.setTtl((int) (this.firstDigitTimer)); - } else { - heartbeat.setTtl(-1); + synchronized (this.LOCK) { + // Hotfix for concurrency issues + // https://github.com/RestComm/mediaserver/issues/164 + if (this.terminated.get()) { + if (logger.isInfoEnabled()) { + logger.info("Skipping collect phase because PlayCollect has been terminated."); + } + return; } - if (this.maxDuration > 0) { - heartbeat.setOverallTtl(this.maxDuration); - } else { - heartbeat.setOverallTtl(-1); + if (this.firstDigitTimer > 0 || this.maxDuration > 0) { + if (this.firstDigitTimer > 0) { + heartbeat.setTtl((int) (this.firstDigitTimer)); + } else { + heartbeat.setTtl(-1); + } + + if (this.maxDuration > 0) { + heartbeat.setOverallTtl(this.maxDuration); + } else { + heartbeat.setOverallTtl(-1); + } + + heartbeat.activate(); + getEndpoint().getScheduler().submitHeatbeat(heartbeat); } - heartbeat.activate(); - getEndpoint().getScheduler().submitHeatbeat(heartbeat); + buffer.activate(); + buffer.flush(); } - - buffer.activate(); - buffer.flush(); } /** @@ -348,27 +377,24 @@ private void terminateCollectPhase() { * Terminates any activity. */ private void terminate() { - try { - terminateSemaphore.acquire(); - } catch (InterruptedException e) { - - } - - this.isPromptActive = false; - this.terminatePrompt(); - this.terminateCollectPhase(); - - if (this.heartbeat != null) { - this.heartbeat.disable(); - this.heartbeat = null; - } + synchronized (this.LOCK) { + if (!this.terminated.get()) { + this.terminated.set(true); + this.isPromptActive = false; + this.terminatePrompt(); + this.terminateCollectPhase(); + + if (this.heartbeat != null) { + this.heartbeat.disable(); + this.heartbeat = null; + } - if (options != null) { - Options.recycle(options); - options = null; + if (options != null) { + Options.recycle(options); + options = null; + } + } } - - terminateSemaphore.release(); } @Override @@ -406,25 +432,36 @@ public void reset() { } private void next(long delay) { - segCount++; - promptIndex++; - try { - String url = prompt[promptIndex].toString(); - if (logger.isInfoEnabled()) { - logger.info(String.format("(%s) Processing player next with url - %s", getEndpoint().getLocalName(), url)); + synchronized (this.LOCK) { + // Hotfix for concurrency issues + // https://github.com/RestComm/mediaserver/issues/164 + if (this.terminated.get() || !this.isPromptActive) { + if (logger.isInfoEnabled()) { + logger.info("Skipping prompt phase because PlayCollect has been terminated."); + } + return; + } + + segCount++; + promptIndex++; + try { + String url = prompt[promptIndex].toString(); + if (logger.isInfoEnabled()) { + logger.info(String.format("(%s) Processing player next with url - %s", getEndpoint().getLocalName(), url)); + } + player.setURL(url); + player.setInitialDelay(delay); + // start playback + player.start(); + } catch (MalformedURLException e) { + logger.error("Received URL in invalid format , firing of"); + of.fire(this, new Text(e.getMessage())); + return; + } catch (ResourceUnavailableException e) { + logger.error("Received URL can not be found , firing of"); + of.fire(this, new Text(e.getMessage())); + return; } - player.setURL(url); - player.setInitialDelay(delay); - // start playback - player.start(); - } catch (MalformedURLException e) { - logger.error("Received URL in invalid format , firing of"); - of.fire(this, new Text(e.getMessage())); - return; - } catch (ResourceUnavailableException e) { - logger.error("Received URL can not be found , firing of"); - of.fire(this, new Text(e.getMessage())); - return; } }