Skip to content

Commit

Permalink
Merge pull request #2093 from tronprotocol/native_eventqueue
Browse files Browse the repository at this point in the history
Native eventqueue
  • Loading branch information
DorianRust authored Apr 8, 2019
2 parents cf0b575 + 93a928b commit 161a3d3
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 62 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ dependencies {

compile "io.vavr:vavr:0.9.2"
compile group: 'org.pf4j', name: 'pf4j', version: '2.5.0'
compile group: 'org.zeromq', name: 'jeromq', version: '0.5.0'

}

check.dependsOn 'lint'
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/tron/common/logsfilter/EventPluginConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ public class EventPluginConfig {
@Setter
private String dbConfig;

@Getter
@Setter
private boolean useNativeQueue;

@Getter
@Setter
private int bindPort;

@Getter
@Setter
private int sendQueueLength;


@Getter
@Setter
Expand All @@ -33,6 +45,9 @@ public EventPluginConfig() {
pluginPath = "";
serverAddress = "";
dbConfig = "";
useNativeQueue = false;
bindPort = 0;
sendQueueLength = 0;
triggerConfigList = new ArrayList<>();
}
}
163 changes: 116 additions & 47 deletions src/main/java/org/tron/common/logsfilter/EventPluginLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import org.pf4j.ManifestPluginDescriptorFinder;
import org.pf4j.PluginManager;
import org.springframework.util.StringUtils;
import org.tron.common.logsfilter.nativequeue.NativeMessageQueue;
import org.tron.common.logsfilter.trigger.BlockLogTrigger;
import org.tron.common.logsfilter.trigger.ContractEventTrigger;
import org.tron.common.logsfilter.trigger.ContractLogTrigger;
import org.tron.common.logsfilter.trigger.TransactionLogTrigger;
import org.tron.common.logsfilter.trigger.Trigger;
import org.tron.common.utils.StringUtil;

@Slf4j
public class EventPluginLoader {
Expand Down Expand Up @@ -45,6 +45,8 @@ public class EventPluginLoader {

private FilterQuery filterQuery;

private boolean useNativeQueue = false;

public static EventPluginLoader getInstance() {
if (Objects.isNull(instance)) {
synchronized (EventPluginLoader.class) {
Expand All @@ -56,17 +58,29 @@ public static EventPluginLoader getInstance() {
return instance;
}

public boolean start(EventPluginConfig config) {
boolean success = false;
private boolean launchNativeQueue(EventPluginConfig config){

if (Objects.isNull(config)) {
return success;
if (!NativeMessageQueue.getInstance().start(config.getBindPort(), config.getSendQueueLength())){
return false;
}

if (Objects.isNull(triggerConfigList)){
logger.error("trigger config is null");
return false;
}

triggerConfigList.forEach(triggerConfig -> {
setSingleTriggerConfig(triggerConfig);
});

return true;
}

private boolean launchEventPlugin(EventPluginConfig config){
boolean success = false;
// parsing subscribe config from config.conf
String pluginPath = config.getPluginPath();
this.serverAddress = config.getServerAddress();
this.triggerConfigList = config.getTriggerConfigList();
this.dbConfig = config.getDbConfig();

if (!startPlugin(pluginPath)) {
Expand All @@ -83,6 +97,24 @@ public boolean start(EventPluginConfig config) {
return true;
}

public boolean start(EventPluginConfig config) {
boolean success = false;

if (Objects.isNull(config)) {
return success;
}

this.triggerConfigList = config.getTriggerConfigList();

useNativeQueue = config.isUseNativeQueue();

if (config.isUseNativeQueue()){
return launchNativeQueue(config);
}

return launchEventPlugin(config);
}

private void setPluginConfig() {

if (Objects.isNull(eventListeners)) {
Expand All @@ -96,39 +128,58 @@ private void setPluginConfig() {
eventListeners.forEach(listener -> listener.setDBConfig(this.dbConfig));

triggerConfigList.forEach(triggerConfig -> {
if (EventPluginConfig.BLOCK_TRIGGER_NAME.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
blockLogTriggerEnable = true;
} else {
blockLogTriggerEnable = false;
}
setSingleTriggerConfig(triggerConfig);
});
}

private void setSingleTriggerConfig(TriggerConfig triggerConfig){
if (EventPluginConfig.BLOCK_TRIGGER_NAME.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
blockLogTriggerEnable = true;
} else {
blockLogTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.BLOCK_TRIGGER, triggerConfig.getTopic());
} else if (EventPluginConfig.TRANSACTION_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
transactionLogTriggerEnable = true;
} else {
transactionLogTriggerEnable = false;
}
}

} else if (EventPluginConfig.TRANSACTION_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
transactionLogTriggerEnable = true;
} else {
transactionLogTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.TRANSACTION_TRIGGER, triggerConfig.getTopic());
} else if (EventPluginConfig.CONTRACTEVENT_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractEventTriggerEnable = true;
} else {
contractEventTriggerEnable = false;
}
}

} else if (EventPluginConfig.CONTRACTEVENT_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractEventTriggerEnable = true;
} else {
contractEventTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.CONTRACTEVENT_TRIGGER, triggerConfig.getTopic());
} else if (EventPluginConfig.CONTRACTLOG_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractLogTriggerEnable = true;
} else {
contractLogTriggerEnable = false;
}
}

} else if (EventPluginConfig.CONTRACTLOG_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractLogTriggerEnable = true;
} else {
contractLogTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.CONTRACTLOG_TRIGGER, triggerConfig.getTopic());
}
});
}
}

public synchronized boolean isBlockLogTriggerEnable() {
Expand Down Expand Up @@ -195,32 +246,52 @@ protected CompoundPluginDescriptorFinder createPluginDescriptorFinder() {
}

public void stopPlugin() {
if (Objects.isNull(pluginManager)) {
logger.info("pluginManager is null");
return;
if (Objects.nonNull(pluginManager)){
pluginManager.stopPlugins();
}

pluginManager.stopPlugins();
NativeMessageQueue.getInstance().stop();

logger.info("eventPlugin stopped");
}

public void postBlockTrigger(BlockLogTrigger trigger) {
eventListeners.forEach(listener ->
listener.handleBlockEvent(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener ->
listener.handleBlockEvent(toJsonString(trigger)));
}
}

public void postTransactionTrigger(TransactionLogTrigger trigger) {
eventListeners.forEach(listener -> listener.handleTransactionTrigger(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener -> listener.handleTransactionTrigger(toJsonString(trigger)));
}
}

public void postContractLogTrigger(ContractLogTrigger trigger) {
eventListeners.forEach(listener ->
listener.handleContractLogTrigger(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener ->
listener.handleContractLogTrigger(toJsonString(trigger)));
}
}

public void postContractEventTrigger(ContractEventTrigger trigger) {
eventListeners.forEach(listener ->
listener.handleContractEventTrigger(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener ->
listener.handleContractEventTrigger(toJsonString(trigger)));
}
}

private String toJsonString(Object data) {
Expand All @@ -242,6 +313,4 @@ public synchronized void setFilterQuery(FilterQuery filterQuery) {
public synchronized FilterQuery getFilterQuery() {
return filterQuery;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.tron.common.logsfilter.nativequeue;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.util.Objects;

public class NativeMessageQueue {
ZContext context = null;
private ZMQ.Socket publisher = null;
private static NativeMessageQueue instance;
private static final int DEFAULT_BIND_PORT = 5555;
private static final int DEFAULT_QUEUE_LENGTH = 1000;
public static NativeMessageQueue getInstance() {
if (Objects.isNull(instance)) {
synchronized (NativeMessageQueue.class) {
if (Objects.isNull(instance)) {
instance = new NativeMessageQueue();
}
}
}
return instance;
}

public boolean start(int bindPort, int sendQueueLength) {
context = new ZContext();
publisher = context.createSocket(SocketType.PUB);

if (Objects.isNull(publisher)){
return false;
}

if (bindPort == 0 || bindPort < 0) {
bindPort = DEFAULT_BIND_PORT;
}

if (sendQueueLength ==0 || sendQueueLength < 0){
sendQueueLength = DEFAULT_QUEUE_LENGTH;
}

context.setSndHWM(sendQueueLength);

String bindAddress = String.format("tcp://*:%d", bindPort);
publisher.bind(bindAddress);

return true;
}

public void stop(){
if (Objects.nonNull(publisher)){
publisher.close();
}

if (Objects.nonNull(context)){
context.close();
}
}

public void publishTrigger(String data, String topic){
if (Objects.isNull(publisher) || Objects.isNull(context.isClosed()) || context.isClosed()) {
return;
}

publisher.sendMore(topic);
publisher.send(data);
}
}
Loading

0 comments on commit 161a3d3

Please sign in to comment.