Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release/0.7.0' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
hhund committed Oct 18, 2022
2 parents d1b1868 + f0a7057 commit 7de3bb8
Show file tree
Hide file tree
Showing 87 changed files with 924 additions and 621 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ codex-processes-ap1-test-data-generator/rsa/*.pem
###
codex-processes-ap1-docker-test-setup/**/bpe/log/*.log
codex-processes-ap1-docker-test-setup/**/bpe/log/*.log.gz
codex-processes-ap1-docker-test-setup/**/bpe/last_event/time.file
codex-processes-ap1-docker-test-setup/**/bpe/plugin/*.jar
codex-processes-ap1-docker-test-setup/**/bpe/process/*.jar

Expand Down
3 changes: 2 additions & 1 deletion codex-process-data-transfer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>de.netzwerk-universitaetsmedizin.codex</groupId>
<artifactId>codex-processes-ap1</artifactId>
<version>0.6.0</version>
<version>0.7.0</version>
</parent>

<properties>
Expand All @@ -23,6 +23,7 @@
<dependency>
<groupId>org.highmed.dsf</groupId>
<artifactId>dsf-tools-documentation-generator</artifactId>
<scope>provided</scope>
</dependency>

<!-- must be added as regular DSF plugin -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface ConstantsDataTransfer
String CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_DATA_REFERENCE = "data-reference";
String CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_DRY_RUN = "dry-run";
String CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_ENCRYPTED_BUNDLE_SIZE = "encrypted-bundle-size";
String CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_LOCAL_VALIDATION_SUCCESSFUL = "local-validation-successful";
String CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_ENCRYPTED_BUNDLE_RESOURCES_COUNT = "encrypted-bundle-resources-count";

String PROFILE_NUM_CODEX_TASK_DATA_TRIGGER_PROCESS_URI = "http://www.netzwerk-universitaetsmedizin.de/bpe/Process/dataTrigger/";
String PROFILE_NUM_CODEX_TASK_DATA_TRIGGER_PROCESS_URI_AND_LATEST_VERSION = PROFILE_NUM_CODEX_TASK_DATA_TRIGGER_PROCESS_URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class DataTransferProcessPluginDefinition implements ProcessPluginDefinit
{
private static final Logger logger = LoggerFactory.getLogger(DataTransferProcessPluginDefinition.class);

public static final String VERSION = "0.6.0";
public static final LocalDate DATE = LocalDate.of(2022, 7, 12);
public static final String VERSION = "0.7.0";
public static final LocalDate DATE = LocalDate.of(2022, 10, 18);

@Override
public String getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private static final class FttpClientStub implements FttpClient
{
private static final Logger logger = LoggerFactory.getLogger(FttpClientStub.class);

private static final String DIC_PSEUDONYM = "source2/original2";
private static final Pattern DIC_PSEUDONYM_PATTERN = Pattern.compile(PSEUDONYM_PATTERN_STRING);

@Override
Expand Down Expand Up @@ -64,8 +65,9 @@ public Optional<String> getCrrPseudonym(String dicSourceAndPseudonym)
@Override
public Optional<String> getDicPseudonym(String bloomFilter)
{
logger.info("Requesting DIC pseudonym for bloom filter {} ", bloomFilter);
return Optional.of("source2/original2");
logger.warn("Returning simulated DIC pseudonym '{}' for bloom filter '{}', fTTP connection not configured.",
DIC_PSEUDONYM, bloomFilter);
return Optional.of(DIC_PSEUDONYM);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class GeccoFhirClientStub implements GeccoFhirClient
private static final Logger logger = LoggerFactory.getLogger(GeccoFhirClientStub.class);

private static final String condition = "{\"resourceType\":\"Condition\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/chronic-lung-diseases\"]},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"active\",\"display\":\"Active\"}]},\"verificationStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-ver-status\",\"code\":\"confirmed\",\"display\":\"Confirmed\"},{\"system\":\"http://snomed.info/sct\",\"code\":\"410605003\",\"display\":\"Confirmed present (qualifier value)\"}]},\"category\":[{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"418112009\",\"display\":\"Pulmonary medicine\"}]}],\"code\":{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"413839001\",\"display\":\"Chronic lung disease\"}]},\"recordedDate\":\"2020-11-10T15:50:41.000+01:00\"}";
private static final String patient = "{\"resourceType\":\"Patient\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/Patient\"]},\"extension\":[{\"url\":\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/ethnic-group\",\"valueCoding\":{\"system\":\"http://snomed.info/sct\",\"code\":\"186019001\",\"display\":\"Other ethnic, mixed origin\"}},{\"url\":\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/age\",\"extension\":[{\"url\":\"dateTimeOfDocumentation\",\"valueDateTime\":\"2020-10-01\"},{\"url\":\"age\",\"valueAge\":{\"value\":67,\"unit\":\"years\",\"system\":\"http://unitsofmeasure.org\",\"code\":\"a\"}}]}],\"birthDate\":\"1953-09-30\"}";
private static final String patient = "{\"resourceType\":\"Patient\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/Patient\"]},\"extension\":[{\"url\":\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/ethnic-group\",\"valueCoding\":{\"system\":\"http://snomed.info/sct\",\"code\":\"26242008\",\"display\":\"Mixed (qualifier value)\"}},{\"url\":\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/age\",\"extension\":[{\"url\":\"dateTimeOfDocumentation\",\"valueDateTime\":\"2020-10-01\"},{\"url\":\"age\",\"valueAge\":{\"value\":67,\"unit\":\"years\",\"system\":\"http://unitsofmeasure.org\",\"code\":\"a\"}}]}],\"birthDate\":\"1953-09-30\"}";
private static final String observation = "{\"resourceType\":\"Observation\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/sars-cov-2-rt-pcr\"]},\"identifier\":[{\"type\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/v2-0203\",\"code\":\"OBI\"}]}}],\"status\":\"final\",\"category\":[{\"coding\":[{\"system\":\"http://loinc.org\",\"code\":\"26436-6\"},{\"system\":\"http://terminology.hl7.org/CodeSystem/observation-category\",\"code\":\"laboratory\"}]}],\"code\":{\"coding\":[{\"system\":\"http://loinc.org\",\"code\":\"94500-6\",\"display\":\"SARS-CoV-2 (COVID-19) RNA [Presence] in Respiratory specimen by NAA with probe detection\"}],\"text\":\"SARS-CoV-2-RNA (PCR)\"},\"effectiveDateTime\":\"2020-11-10T15:50:41.000+01:00\",\"valueCodeableConcept\":{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"260373001\",\"display\":\"Detected (qualifier value)\"}],\"text\":\"SARS-CoV-2-RNA positiv\"}}";

private final GeccoClient geccoClient;
Expand All @@ -57,7 +57,9 @@ public void storeBundle(Bundle bundle)
@Override
public PatientReferenceList getPatientReferencesWithNewData(DateWithPrecision exportFrom, Date exportTo)
{
logger.warn("Returning demo pseudonyms for {}", geccoClient.getLocalIdentifierValue());
logger.warn(
"Returning four simulated patient references for {}, connection to GECCO FHIR server not configured",
geccoClient.getLocalIdentifierValue());

PatientReference reference1 = PatientReference
.from(new Identifier().setSystem(NAMING_SYSTEM_NUM_CODEX_DIC_PSEUDONYM).setValue("dic_foo/bar"));
Expand All @@ -79,18 +81,23 @@ public PatientReferenceList getPatientReferencesWithNewData(DateWithPrecision ex
@Override
public Stream<DomainResource> getNewData(String pseudonym, DateWithPrecision exportFrom, Date exportTo)
{
logger.warn("Returning demo resources for {}", pseudonym);
logger.warn(
"Returning simulated GECCO FHIR resources (Patient, Condition, Observation) for {}, connection to GECCO FHIR server not configured",
pseudonym);

Patient p = geccoClient.getFhirContext().newJsonParser().parseResource(Patient.class, patient);
p.setIdElement(new IdType(UUID.randomUUID().toString()));
p.addIdentifier().setSystem(NAMING_SYSTEM_NUM_CODEX_DIC_PSEUDONYM).setValue(pseudonym).getType()
.getCodingFirstRep().setSystem(IDENTIFIER_NUM_CODEX_DIC_PSEUDONYM_TYPE_SYSTEM)
.setCode(IDENTIFIER_NUM_CODEX_DIC_PSEUDONYM_TYPE_CODE);
p.setIdElement(new IdType("Patient", UUID.randomUUID().toString()));

Condition c = geccoClient.getFhirContext().newJsonParser().parseResource(Condition.class, condition);
c.setIdElement(new IdType(UUID.randomUUID().toString()));
c.setSubject(new Reference(p.getIdElement()));

Observation o = geccoClient.getFhirContext().newJsonParser().parseResource(Observation.class, observation);
o.setIdElement(new IdType(UUID.randomUUID().toString()));
o.setSubject(new Reference(p.getIdElement()));

return Stream.of(p, c, o);
Expand All @@ -99,6 +106,9 @@ public Stream<DomainResource> getNewData(String pseudonym, DateWithPrecision exp
@Override
public Optional<Patient> getPatient(String reference)
{
logger.warn("Returning simulated patient resource for {}, connection to GECCO FHIR server not configured",
reference);

Patient p = geccoClient.getFhirContext().newJsonParser().parseResource(Patient.class, patient);
p.addIdentifier().setSystem(ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_BLOOM_FILTER).setValue(
"J75gYl+RiKSsxeu33tixBEEtFGCZwIEsWIKgvESaluvpSGBte/SBNZilz+sLSZdHSDKTL2J2d1yZsakqjtV5U2SMMJZ5IF3gEk1MT3sCRkxXEo1aJWKpnqndUTR+fvtSeMFj0y/O5yqrLV9zU79CNiTfZN5t1/6XGxZUXq2DovfCRrrpRxWjFwjKIDo0OkRANf7Mqp+Fsu0Un53JF57p/p1RLpWcJkC3xO+UslGbDo3mjgczdvxz0aLmWNA7/NIhk+Q50gxCX3B4QrntPfLLlBkrmIpsKRcLFVuYZik7pYZ9prd0qCLQ9tc8qiw1ry5kMfIvLnIS/FV36w==")
Expand All @@ -113,5 +123,7 @@ public Optional<Patient> getPatient(String reference)
public void updatePatient(Patient patient)
{
// Nothing to do in stub client
logger.info(
"Not updating patient resource in GECCO FHIR server, connection to GECCO FHIR server not configured");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CrrKeyProviderImpl implements CrrKeyProvider

/**
* One or both parameters should be <code>null</code>
*
*
* @param crrPrivateKeyFile
* @param crrPublicKeyFile
* @return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package de.netzwerk_universitaetsmedizin.codex.processes.data_transfer.listener;

import static de.netzwerk_universitaetsmedizin.codex.processes.data_transfer.ConstantsDataTransfer.CODESYSTEM_NUM_CODEX_DATA_TRANSFER;
import static de.netzwerk_universitaetsmedizin.codex.processes.data_transfer.ConstantsDataTransfer.CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_LOCAL_VALIDATION_SUCCESSFUL;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Objects;

import javax.activation.DataHandler;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMultipart;
import javax.mail.util.ByteArrayDataSource;

import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.highmed.dsf.bpe.service.MailService;
import org.highmed.dsf.fhir.client.FhirWebserviceClientProvider;
import org.highmed.dsf.fhir.task.TaskHelper;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Task;
import org.hl7.fhir.r4.model.Task.TaskOutputComponent;
import org.hl7.fhir.r4.model.Task.TaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import ca.uhn.fhir.context.FhirContext;
import de.netzwerk_universitaetsmedizin.codex.processes.data_transfer.DataTransferProcessPluginDefinition;

public class AfterDryRunEndListener implements ExecutionListener, InitializingBean
{
private static final Logger logger = LoggerFactory.getLogger(AfterDryRunEndListener.class);

private final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

private final TaskHelper taskHelper;
private final FhirWebserviceClientProvider clientProvider;
private final FhirContext fhirContext;
private final MailService mailService;

private final String localOrganizationIdentifierValue;
private final boolean sendDryRunSuccessMail;

public AfterDryRunEndListener(TaskHelper taskHelper, FhirWebserviceClientProvider clientProvider,
FhirContext fhirContext, MailService mailService, String localOrganizationIdentifierValue,
boolean sendDryRunSuccessMail)
{
this.taskHelper = taskHelper;
this.clientProvider = clientProvider;
this.fhirContext = fhirContext;
this.mailService = mailService;

this.localOrganizationIdentifierValue = localOrganizationIdentifierValue;
this.sendDryRunSuccessMail = sendDryRunSuccessMail;
}

@Override
public void afterPropertiesSet() throws Exception
{
Objects.requireNonNull(taskHelper, "taskHelper");
Objects.requireNonNull(clientProvider, "clientProvider");
Objects.requireNonNull(fhirContext, "fhirContext");
Objects.requireNonNull(mailService, "mailService");

Objects.requireNonNull(localOrganizationIdentifierValue, "localOrganizationIdentifierValue");
}

@Override
public void notify(DelegateExecution execution) throws Exception
{
if (!sendDryRunSuccessMail)
return;

Task task = taskHelper.getLeadingTaskFromExecutionVariables(execution);
Task finalTask = clientProvider.getLocalWebserviceClient().read(Task.class, task.getIdElement().getIdPart());

if (!TaskStatus.COMPLETED.equals(finalTask.getStatus()))
{
logger.warn("Final Task from DSF FHIR server not in status {} but {}, not sending dry-run success mail",
TaskStatus.COMPLETED, finalTask.getStatus());
return;
}

if (!isLocalValidationSuccessful(finalTask))
{
logger.warn(
"Final Task from DSF FHIR server missing '{}' output parameter with value 'true', not sending dry-run success mail",
CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_LOCAL_VALIDATION_SUCCESSFUL);
return;
}

String finalTaskAsXml = fhirContext.newXmlParser().setPrettyPrint(true).encodeResourceToString(finalTask);
String attachmentFilename = finalTask.getIdElement().getIdPart() + ".xml";

MimeBodyPart text = new MimeBodyPart();
text.setText(createMesssage(finalTask, attachmentFilename));

MimeBodyPart attachment = new MimeBodyPart();
attachment.setFileName(attachmentFilename);
attachment.setDataHandler(new DataHandler(
new ByteArrayDataSource(finalTaskAsXml.getBytes(StandardCharsets.UTF_8), "application/xml")));

MimeMultipart body = new MimeMultipart();
body.addBodyPart(text);
body.addBodyPart(attachment);

MimeBodyPart message = new MimeBodyPart();
message.setContent(body);

mailService.send("Dry-Run Success: " + localOrganizationIdentifierValue, message);
}

private boolean isLocalValidationSuccessful(Task task)
{
return task.getOutput().stream().filter(TaskOutputComponent::hasType).filter(o -> o.getType().hasCoding())
.filter(o -> o.getType().getCoding().stream()
.anyMatch(c -> CODESYSTEM_NUM_CODEX_DATA_TRANSFER.equals(c.getSystem())
&& CODESYSTEM_NUM_CODEX_DATA_TRANSFER_VALUE_LOCAL_VALIDATION_SUCCESSFUL
.equals(c.getCode())))
.filter(TaskOutputComponent::hasValue).filter(o -> o.getValue() instanceof BooleanType)
.map(o -> (BooleanType) o.getValue()).anyMatch(b -> Boolean.TRUE.equals(b.getValue()));
}

private String createMesssage(Task finalTask, String attachmentFileName)
{
StringBuilder b = new StringBuilder();

b.append("Send process version ");
b.append(DataTransferProcessPluginDefinition.VERSION);
b.append(" dry-run at organization with identifier '");
b.append(localOrganizationIdentifierValue);
b.append("' successfully completed on ");
b.append(DATE_FORMAT.format(finalTask.getMeta().getLastUpdated()));
b.append(".\n\nTask resource is attached as '");
b.append(attachmentFileName);
b.append("'");

return b.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,41 +1,90 @@
package de.netzwerk_universitaetsmedizin.codex.processes.data_transfer.logging;

import java.util.Objects;

import org.highmed.dsf.bpe.service.MailService;
import org.hl7.fhir.r4.model.IdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

public class ErrorLogger
public class ErrorLogger implements InitializingBean
{
private static final Logger validationLogger = LoggerFactory.getLogger("validation-error-logger");
private static final Logger errorLogger = LoggerFactory.getLogger("error-logger");

private final MailService mailService;

private final boolean sendValidationFailedMail;
private final boolean sendProcessFailedMail;

public ErrorLogger(MailService mailService, boolean sendValidationFailedMail, boolean sendProcessFailedMail)
{
this.mailService = mailService;

this.sendValidationFailedMail = sendValidationFailedMail;
this.sendProcessFailedMail = sendProcessFailedMail;
}

@Override
public void afterPropertiesSet() throws Exception
{
Objects.requireNonNull(mailService, "mailService");
}

public void logValidationFailed(IdType taskId)
{
validationLogger.debug("Validation of FHIR resources failed, started by Task {}", taskId.getValue());
validationLogger.debug("Validation of FHIR resources failed in process started by {}",
taskId.toVersionless().getValue());

if (sendValidationFailedMail)
mailService.send("Validation Error",
"Validation of FHIR resources failed in process started by " + taskId.toVersionless().getValue());
}

public void logValidationFailedLocal(IdType taskId)
{
validationLogger.debug("Local validation of FHIR resources failed, started by Task {}", taskId.getValue());
validationLogger.debug("Local validation of FHIR resources failed in process started by {}",
taskId.toVersionless().getValue());

if (sendValidationFailedMail)
mailService.send("Validation Error", "Local validation of FHIR resources failed in process started by "
+ taskId.toVersionless().getValue());
}

public void logValidationFailedRemote(IdType taskId)
{
validationLogger.debug("Remote validation of FHIR resources failed, started by Task {}", taskId.getValue());
validationLogger.debug("Remote validation of FHIR resources failed in process started by {}",
taskId.toVersionless().getValue());

if (sendValidationFailedMail)
mailService.send("Validation Error", "Remote validation of FHIR resources failed in process started by "
+ taskId.toVersionless().getValue());
}

public void logDataSendFailed(IdType taskId)
{
errorLogger.debug("Send process failed, started by Task {}", taskId.getValue());
errorLogger.debug("Send process failed started by {}", taskId.toVersionless().getValue());

if (sendProcessFailedMail)
mailService.send("Proccess Failed", "Send process failed started by " + taskId.toVersionless().getValue());
}

public void logDataTranslateFailed(IdType taskId)
{
errorLogger.debug("Translate process failed, started by Task {}", taskId.getValue());
errorLogger.debug("Translate process failed started by {}", taskId.toVersionless().getValue());

if (sendProcessFailedMail)
mailService.send("Proccess Failed",
"Translate process failed started by " + taskId.toVersionless().getValue());
}

public void logDataReceiveFailed(IdType taskId)
{
errorLogger.debug("Receive process failed, started by Task {}", taskId.getValue());
errorLogger.debug("Receive process failed started by {}", taskId.toVersionless().getValue());

if (sendProcessFailedMail)
mailService.send("Proccess Failed",
"Receive process failed started by " + taskId.toVersionless().getValue());
}
}
Loading

0 comments on commit 7de3bb8

Please sign in to comment.