Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bulkprocessor #564

Merged
merged 6 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,45 @@ public void testRegexpMultipleMoreOutputVarsThanGroups() {
out.assertFieldEquals("part2", "text2");
}

@Test
public void testIpRegexWithPort() {

Record record1 = getRecord();
record1.setField("host", FieldType.STRING, "84.209.99.184:8888");

TestRunner testRunner = TestRunners.newTestRunner(new ApplyRegexp());
testRunner.setProperty("host", "part1:((.+)(?=\\:)|\\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b)");
testRunner.assertValid();
testRunner.enqueue(record1);
testRunner.run();
testRunner.assertAllInputRecordsProcessed();
testRunner.assertOutputRecordsCount(1);

MockRecord out = testRunner.getOutputRecords().get(0);
out.assertRecordSizeEquals(6);
out.assertFieldEquals("part1", "84.209.99.184");
}

@Test
public void testIpRegexWithoutPort() {

Record record1 = getRecord();
record1.setField("remoteHost", FieldType.STRING, "84.209.99.184");

TestRunner testRunner = TestRunners.newTestRunner(new ApplyRegexp());
testRunner.setProperty("remoteHost", "remoteHost:((.+)(?=\\:)|\\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b)");
testRunner.assertValid();
testRunner.enqueue(record1);
testRunner.run();
testRunner.assertAllInputRecordsProcessed();
testRunner.assertOutputRecordsCount(1);

MockRecord out = testRunner.getOutputRecords().get(0);
out.assertRecordSizeEquals(5);
out.assertFieldEquals("remoteHost", "84.209.99.184");
}


@Test
public void testRegexpMultipleLessOutputVarsThanGroups() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
<artifactId>vertx-rx-java2</artifactId>
<version>${vertx.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>


</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.hurence.logisland.rest.processor.lookup;


import com.google.common.collect.Lists;
import com.hurence.logisland.component.AllowableValue;
import com.hurence.logisland.component.InitializationException;
import com.hurence.logisland.component.PropertyDescriptor;
Expand All @@ -31,6 +32,7 @@
import com.hurence.logisland.validator.ValidationResult;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public abstract class AbstractCallRequest extends AbstractHttpProcessor
Expand Down Expand Up @@ -59,6 +61,27 @@ public abstract class AbstractCallRequest extends AbstractHttpProcessor
.expressionLanguageSupported(true)
.build();

/*
tag1=valuea,valueb;tag2=valuea,valuec
*/
public static final PropertyDescriptor TAG_KEY_VALUE = new PropertyDescriptor.Builder()
.name("tag.map")
.description("the tags from the record with their values to allow the bulk rest call")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

/*
records.threshold=1000
*/
public static final PropertyDescriptor RECORDS_THRESHOLD = new PropertyDescriptor.Builder()
.name("records.threshold")
.description("the number of records you want to bulk together to construct the curl body")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();


public static final PropertyDescriptor REQUEST_BODY = new PropertyDescriptor.Builder()
.name("request.body")
.description("The body to use for the request.")
Expand Down Expand Up @@ -152,6 +175,8 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
props.add(CONFLICT_RESOLUTION_POLICY);
props.add(VALID_HTTP_CODES);
props.add(KEEP_ONLY_BODY_RESPONSE);
props.add(TAG_KEY_VALUE);
props.add(RECORDS_THRESHOLD);
return Collections.unmodifiableList(props);
}

Expand Down Expand Up @@ -213,6 +238,62 @@ Optional<String> calculBody(Record record, ProcessContext context) {
return Optional.empty();
}

ArrayList<Optional<String>> concatBody(Collection<Record> records, ProcessContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method does not seem general to me. I mean it is usefull only for a specific case so I wonder why it is in AbstractCallRequest. And this method is currently used only in one processor so there is no reason to have this method in AbstractCallRequest (which should really be general)

ArrayList<Optional<String>> result = new ArrayList<>();
if (records != null && !records.isEmpty()) {
int partition_size = (context.getPropertyValue(RECORDS_THRESHOLD).isSet()) ? context.getPropertyValue(RECORDS_THRESHOLD).asInteger() : 1000;
ArrayList<Record> records_list = new ArrayList<>(records);
for (List<Record> partition : Lists.partition(records_list, partition_size)) {

StringBuffer buffer = new StringBuffer();
for (Record record : partition ) {
if (triggerRestCall(record, context) && record.getField("ItemId") != null && record.getField("Userid") != null
&& record.getField("ItemId").isSet() && record.getField("Userid").isSet()) {
buffer.append("{");

try {
buffer.append("\"id\":" + Long.parseLong(record.getField("Userid").asString() + record.getField("ItemId").asString()));
buffer.append(",");
} catch (NumberFormatException e) {
getLogger().debug("User id or Item can't be parsed to long (maybe undefined)" + e.getMessage());
}

try {
if (record.getField("SecondsViewed").isSet()) {
buffer.append("\"timeWatched\":" + record.getField("SecondsViewed").asLong());
buffer.append(",");
}

if (record.getField("VideoPercentViewed").isSet()) {
buffer.append("\"watched\":" + record.getField("VideoPercentViewed").asInteger());
buffer.append(",");
}
} catch (Exception e) {
getLogger().debug("Best effort mode didn't work to get seconds and/or percent viewed , can happen on live session " + e.getMessage());
}

buffer.append("\"presentationId\":" + "\"" + record.getField("ItemId").asString() + "\"");
buffer.append(",");
try {
buffer.append("\"userId\":" + record.getField("Userid").asLong());
}catch (NumberFormatException e)
{
buffer.append("\"userId\":-1" );
}

buffer.append("},");
}

}
if (buffer.length() > 0) {
buffer.setLength(buffer.length() - 1);
result.add(Optional.ofNullable("[ " + buffer + " ]"));
}
}
}
return result;
}

Optional<String> calculMimTyp(Record record, ProcessContext context) {
if (context.getPropertyValue(REQUEST_MIME_TYPE).isSet()) {
return Optional.ofNullable(context.getPropertyValue(REQUEST_MIME_TYPE.getName()).evaluate(record).asString());
Expand All @@ -226,4 +307,33 @@ Optional<String> calculVerb(Record record, ProcessContext context) {
}
return Optional.empty();
}

/*
tag1=valuea,valueb;tag2=valuea,valuec
if the record contains one of the tag of the property with one of the value for this tag it will return true
*/
Boolean triggerRestCall(Record record, ProcessContext context) {
AtomicBoolean result = new AtomicBoolean(false);
if (context.getPropertyValue(TAG_KEY_VALUE).isSet()) {
String tag_list = context.getPropertyValue(TAG_KEY_VALUE).asString();
String [] keyValuePairs = tag_list.split(";");
Map<String,String> map = new HashMap<>();
for(String pair : keyValuePairs) //iterate over the pairs
{
String[] entry = pair.split("="); //split the pairs to get key and value
map.put(entry[0].trim(), entry[1].trim()); //add them to the hashmap and trim whitespaces
}
map.forEach( (k,v) -> { //k here is tag1 and tag2
if ( record.getField(k) != null && record.getField(k).isSet()){
String[] single_values = v.split(",");
List<String> stringList = new ArrayList<>(Arrays.asList(single_values)); // List(valuea,valueb) for tag1 List(valuea,valuec) for tag2
String recordValue = record.getField(k).asString();
if (stringList.contains(recordValue)){
result.set(true);
}
}
});
}
return result.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright (C) 2016 Hurence ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hurence.logisland.rest.processor.lookup;


import com.hurence.logisland.annotation.documentation.CapabilityDescription;
import com.hurence.logisland.annotation.documentation.Tags;
import com.hurence.logisland.component.InitializationException;
import com.hurence.logisland.error.ErrorUtils;
import com.hurence.logisland.processor.ProcessContext;
import com.hurence.logisland.processor.ProcessError;
import com.hurence.logisland.record.Record;
import com.hurence.logisland.record.StandardRecord;
import io.reactivex.Maybe;
import io.vertx.core.Handler;
import io.vertx.reactivex.core.Promise;
import io.vertx.reactivex.core.Vertx;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Tags({"rest", "record", "http", "request", "call", "server"})
@CapabilityDescription("Execute an http request with specified verb, body and mime type. Then stock result as a Record in the specified field")
//@ExtraDetailFile("./details/common-processors/BulkPut-Detail.rst")
public class AsyncCallRequestBulkPostJson extends AbstractCallRequest
{
private Vertx vertx;

@Override
public void init(ProcessContext context) throws InitializationException {
super.init(context);
try {
vertx = Vertx.vertx();
} catch (Exception ex) {
throw new InitializationException(ex);
}
}

public void stop() {
if (vertx != null) {
vertx.close();
}
}

/**
* process events
*
* @param context
* @param records
* @return
*/
@Override
public Collection<Record> process(final ProcessContext context, final Collection<Record> records) {
if (records.isEmpty()) {
getLogger().warn("process has been called with an empty list of records !");
return records;
}
/**
* loop over events to add them to bulk
*/
getLogger().debug("Into the bulk " );

ArrayList<Optional<String>> requestBodies = concatBody(records, context);
getLogger().debug("Bulk body " +requestBodies );

if ( requestBodies !=null && !requestBodies.isEmpty() && records.stream().findFirst().isPresent()) {
for (Optional<String> bodies : requestBodies) {
if (bodies.isPresent()) {
Record record = records.stream().findFirst().get();
StandardRecord coordinates = new StandardRecord(record);

calculVerb(record, context).ifPresent(verb -> coordinates.setStringField(restClientService.getMethodKey(), verb));
calculMimTyp(record, context).ifPresent(mimeType -> coordinates.setStringField(restClientService.getMimeTypeKey(), mimeType));

coordinates.setStringField(restClientService.getbodyKey(), bodies.get());

getLogger().debug("Calling bulk for method " + coordinates.getField(restClientService.getMethodKey()).asString() +
" type " + coordinates.getField(restClientService.getMimeTypeKey()).asString() +
" with body " + coordinates.getField(restClientService.getbodyKey()).asString());
Handler<Promise<Optional<Record>>> callRequestHandler = p -> {
try {
p.complete(restClientService.lookup(coordinates));
} catch (Throwable t) { //There is other errors than LookupException, The proxyWrapper does wrap those into Reflection exceptions...
p.fail(t);
}
};

Maybe<Optional<Record>> response = vertx
.rxExecuteBlocking(callRequestHandler)
.doOnError(t -> {
ErrorUtils.handleError(getLogger(), t, record, ProcessError.RUNTIME_ERROR.getName());
})
.doOnSuccess(rspOpt -> {
rspOpt.ifPresent(rsp -> modifyRecord(record, rsp));
});
response.blockingGet();// wait until the request is done
}
}
}
getLogger().debug("Bulk ended " );
stop();
return records;
}
}
Loading