Skip to content

Commit

Permalink
NIFI-14272 Refactored MongoDB Client Creation (#9723)
Browse files Browse the repository at this point in the history
Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
pvillard31 authored Feb 17, 2025
1 parent 11c2151 commit 48d6845
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ public void testInsertOne() throws Exception {
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
out.assertContentEquals(bytes);

assertEquals(runner.getProvenanceEvents().get(0).getTransitUri(), MONGO_CONTAINER.getConnectionString());

// verify 1 doc inserted into the collection
assertEquals(1, collection.countDocuments());
assertEquals(doc, collection.find().first());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,11 @@

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;

import java.util.Arrays;
import java.util.List;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
Expand All @@ -43,6 +37,12 @@
import org.apache.nifi.ssl.SSLContextProvider;
import org.bson.Document;

import javax.net.ssl.SSLContext;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

@Tags({"mongo", "mongodb", "service"})
@CapabilityDescription(
"Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
Expand All @@ -53,7 +53,7 @@ public class MongoDBControllerService extends AbstractControllerService implemen

@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.uri = getURI(context);
this.uri = context.getProperty(URI).evaluateAttributeExpressions().getValue();
this.mongoClient = createClient(context, this.mongoClient);
}

Expand Down Expand Up @@ -90,8 +90,25 @@ protected MongoClient createClient(ConfigurationContext context, MongoClient exi
}

try {
final String uri = getURI(context);
final MongoClientSettings.Builder builder = getClientSettings(uri, sslContext);
final String uri = context.getProperty(URI).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();

final MongoClientSettings.Builder builder = MongoClientSettings.builder();
final ConnectionString cs = new ConnectionString(uri);

if (user != null && passw != null) {
final String database = cs.getDatabase() == null ? "admin" : cs.getDatabase();
final MongoCredential credential = MongoCredential.createCredential(user, database, passw.toCharArray());
builder.credential(credential);
}

if (sslContext != null) {
builder.applyToSslSettings(sslBuilder -> sslBuilder.enabled(true).context(sslContext));
}

builder.applyConnectionString(cs);

final MongoClientSettings clientSettings = builder.build();
return MongoClients.create(clientSettings);
} catch (Exception e) {
Expand All @@ -100,17 +117,6 @@ protected MongoClient createClient(ConfigurationContext context, MongoClient exi
}
}

protected MongoClientSettings.Builder getClientSettings(final String uri, final SSLContext sslContext) {
final MongoClientSettings.Builder builder = MongoClientSettings.builder();
builder.applyConnectionString(new ConnectionString(uri));
if (sslContext != null) {
builder.applyToSslSettings(sslBuilder ->
sslBuilder.enabled(true).context(sslContext)
);
}
return builder;
}

@OnStopped
public final void onStopped() {
closeClient(mongoClient);
Expand All @@ -122,17 +128,6 @@ private void closeClient(MongoClient client) {
}
}

protected String getURI(final ConfigurationContext context) {
final String uri = context.getProperty(URI).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
if (!uri.contains("@") && user != null && passw != null) {
return uri.replaceFirst("://", "://" + URLEncoder.encode(user, StandardCharsets.UTF_8) + ":" + URLEncoder.encode(passw, StandardCharsets.UTF_8) + "@");
} else {
return uri;
}
}

@Override
public WriteConcern getWriteConcern() {
WriteConcern writeConcern = null;
Expand Down

0 comments on commit 48d6845

Please sign in to comment.