Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native eventqueue #2093

Merged
merged 10 commits into from
Apr 8, 2019
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ dependencies {

compile "io.vavr:vavr:0.9.2"
compile group: 'org.pf4j', name: 'pf4j', version: '2.5.0'
compile group: 'org.zeromq', name: 'jeromq', version: '0.5.0'

}

check.dependsOn 'lint'
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/tron/common/logsfilter/EventPluginConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ public class EventPluginConfig {
@Setter
private String dbConfig;

@Getter
@Setter
private boolean useNativeQueue;

@Getter
@Setter
private int bindPort;

@Getter
@Setter
private int sendQueueLength;


@Getter
@Setter
Expand All @@ -33,6 +45,9 @@ public EventPluginConfig() {
pluginPath = "";
serverAddress = "";
dbConfig = "";
useNativeQueue = false;
bindPort = 0;
sendQueueLength = 0;
triggerConfigList = new ArrayList<>();
}
}
163 changes: 116 additions & 47 deletions src/main/java/org/tron/common/logsfilter/EventPluginLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import org.pf4j.ManifestPluginDescriptorFinder;
import org.pf4j.PluginManager;
import org.springframework.util.StringUtils;
import org.tron.common.logsfilter.nativequeue.NativeMessageQueue;
import org.tron.common.logsfilter.trigger.BlockLogTrigger;
import org.tron.common.logsfilter.trigger.ContractEventTrigger;
import org.tron.common.logsfilter.trigger.ContractLogTrigger;
import org.tron.common.logsfilter.trigger.TransactionLogTrigger;
import org.tron.common.logsfilter.trigger.Trigger;
import org.tron.common.utils.StringUtil;

@Slf4j
public class EventPluginLoader {
Expand Down Expand Up @@ -45,6 +45,8 @@ public class EventPluginLoader {

private FilterQuery filterQuery;

private boolean useNativeQueue = false;

public static EventPluginLoader getInstance() {
if (Objects.isNull(instance)) {
synchronized (EventPluginLoader.class) {
Expand All @@ -56,17 +58,29 @@ public static EventPluginLoader getInstance() {
return instance;
}

public boolean start(EventPluginConfig config) {
boolean success = false;
private boolean launchNativeQueue(EventPluginConfig config){

if (Objects.isNull(config)) {
return success;
if (!NativeMessageQueue.getInstance().start(config.getBindPort(), config.getSendQueueLength())){
return false;
}

if (Objects.isNull(triggerConfigList)){
logger.error("trigger config is null");
return false;
}

triggerConfigList.forEach(triggerConfig -> {
setSingleTriggerConfig(triggerConfig);
});

return true;
}

private boolean launchEventPlugin(EventPluginConfig config){
boolean success = false;
// parsing subscribe config from config.conf
String pluginPath = config.getPluginPath();
this.serverAddress = config.getServerAddress();
this.triggerConfigList = config.getTriggerConfigList();
this.dbConfig = config.getDbConfig();

if (!startPlugin(pluginPath)) {
Expand All @@ -83,6 +97,24 @@ public boolean start(EventPluginConfig config) {
return true;
}

public boolean start(EventPluginConfig config) {
boolean success = false;

if (Objects.isNull(config)) {
return success;
}

this.triggerConfigList = config.getTriggerConfigList();

useNativeQueue = config.isUseNativeQueue();

if (config.isUseNativeQueue()){
return launchNativeQueue(config);
}

return launchEventPlugin(config);
}

private void setPluginConfig() {

if (Objects.isNull(eventListeners)) {
Expand All @@ -96,39 +128,58 @@ private void setPluginConfig() {
eventListeners.forEach(listener -> listener.setDBConfig(this.dbConfig));

triggerConfigList.forEach(triggerConfig -> {
if (EventPluginConfig.BLOCK_TRIGGER_NAME.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
blockLogTriggerEnable = true;
} else {
blockLogTriggerEnable = false;
}
setSingleTriggerConfig(triggerConfig);
});
}

private void setSingleTriggerConfig(TriggerConfig triggerConfig){
if (EventPluginConfig.BLOCK_TRIGGER_NAME.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
blockLogTriggerEnable = true;
} else {
blockLogTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.BLOCK_TRIGGER, triggerConfig.getTopic());
} else if (EventPluginConfig.TRANSACTION_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
transactionLogTriggerEnable = true;
} else {
transactionLogTriggerEnable = false;
}
}

} else if (EventPluginConfig.TRANSACTION_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
transactionLogTriggerEnable = true;
} else {
transactionLogTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.TRANSACTION_TRIGGER, triggerConfig.getTopic());
} else if (EventPluginConfig.CONTRACTEVENT_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractEventTriggerEnable = true;
} else {
contractEventTriggerEnable = false;
}
}

} else if (EventPluginConfig.CONTRACTEVENT_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractEventTriggerEnable = true;
} else {
contractEventTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.CONTRACTEVENT_TRIGGER, triggerConfig.getTopic());
} else if (EventPluginConfig.CONTRACTLOG_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractLogTriggerEnable = true;
} else {
contractLogTriggerEnable = false;
}
}

} else if (EventPluginConfig.CONTRACTLOG_TRIGGER_NAME
.equalsIgnoreCase(triggerConfig.getTriggerName())) {
if (triggerConfig.isEnabled()) {
contractLogTriggerEnable = true;
} else {
contractLogTriggerEnable = false;
}

if (!useNativeQueue){
setPluginTopic(Trigger.CONTRACTLOG_TRIGGER, triggerConfig.getTopic());
}
});
}
}

public synchronized boolean isBlockLogTriggerEnable() {
Expand Down Expand Up @@ -195,32 +246,52 @@ protected CompoundPluginDescriptorFinder createPluginDescriptorFinder() {
}

public void stopPlugin() {
if (Objects.isNull(pluginManager)) {
logger.info("pluginManager is null");
return;
if (Objects.nonNull(pluginManager)){
pluginManager.stopPlugins();
}

pluginManager.stopPlugins();
NativeMessageQueue.getInstance().stop();

logger.info("eventPlugin stopped");
}

public void postBlockTrigger(BlockLogTrigger trigger) {
eventListeners.forEach(listener ->
listener.handleBlockEvent(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener ->
listener.handleBlockEvent(toJsonString(trigger)));
}
}

public void postTransactionTrigger(TransactionLogTrigger trigger) {
eventListeners.forEach(listener -> listener.handleTransactionTrigger(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener -> listener.handleTransactionTrigger(toJsonString(trigger)));
}
}

public void postContractLogTrigger(ContractLogTrigger trigger) {
eventListeners.forEach(listener ->
listener.handleContractLogTrigger(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener ->
listener.handleContractLogTrigger(toJsonString(trigger)));
}
}

public void postContractEventTrigger(ContractEventTrigger trigger) {
eventListeners.forEach(listener ->
listener.handleContractEventTrigger(toJsonString(trigger)));
if (useNativeQueue){
NativeMessageQueue.getInstance().publishTrigger(toJsonString(trigger), trigger.getTriggerName());
}
else {
eventListeners.forEach(listener ->
listener.handleContractEventTrigger(toJsonString(trigger)));
}
}

private String toJsonString(Object data) {
Expand All @@ -242,6 +313,4 @@ public synchronized void setFilterQuery(FilterQuery filterQuery) {
public synchronized FilterQuery getFilterQuery() {
return filterQuery;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.tron.common.logsfilter.nativequeue;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.util.Objects;

public class NativeMessageQueue {
ZContext context = null;
private ZMQ.Socket publisher = null;
private static NativeMessageQueue instance;
private static final int DEFAULT_BIND_PORT = 5555;
private static final int DEFAULT_QUEUE_LENGTH = 1000;
public static NativeMessageQueue getInstance() {
if (Objects.isNull(instance)) {
synchronized (NativeMessageQueue.class) {
if (Objects.isNull(instance)) {
instance = new NativeMessageQueue();
}
}
}
return instance;
}

public boolean start(int bindPort, int sendQueueLength) {
context = new ZContext();
publisher = context.createSocket(SocketType.PUB);

if (Objects.isNull(publisher)){
return false;
}

if (bindPort == 0 || bindPort < 0) {
bindPort = DEFAULT_BIND_PORT;
}

if (sendQueueLength ==0 || sendQueueLength < 0){
sendQueueLength = DEFAULT_QUEUE_LENGTH;
}

context.setSndHWM(sendQueueLength);

String bindAddress = String.format("tcp://*:%d", bindPort);
publisher.bind(bindAddress);

return true;
}

public void stop(){
if (Objects.nonNull(publisher)){
publisher.close();
}

if (Objects.nonNull(context)){
context.close();
}
}

public void publishTrigger(String data, String topic){
if (Objects.isNull(publisher) || Objects.isNull(context.isClosed()) || context.isClosed()) {
return;
}

publisher.sendMore(topic);
publisher.send(data);
}
}
Loading