diff --git a/build.gradle b/build.gradle index c501025..79f0fa6 100644 --- a/build.gradle +++ b/build.gradle @@ -49,7 +49,7 @@ dependencies { implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0' // https://mvnrepository.com/artifact/org.apache.commons/commons-compress - implementation("org.apache.commons:commons-compress:1.26.1") { + implementation("org.apache.commons:commons-compress:1.26.2") { exclude group: 'com.github.luben', module: 'zstd-jni' } implementation 'com.github.luben:zstd-jni:1.5.6-3' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often. @@ -116,7 +116,7 @@ dependencies { implementation 'org.json:json:20240303' // This is used only in "ParquetFileUtils.createRemoteParquetDirectories()". TODO - Replace it with "gson". // https://mvnrepository.com/artifact/com.google.code.gson/gson - implementation 'com.google.code.gson:gson:2.10.1' + implementation 'com.google.code.gson:gson:2.11.0' // https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.13.0' diff --git a/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java index 60420e3..4e64f65 100644 --- a/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java +++ b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java @@ -9,7 +9,10 @@ import com.google.gson.Gson; import eu.openaire.urls_controller.util.GenericUtils; import java.util.Collection; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @JsonInclude(JsonInclude.Include.NON_NULL) @@ -34,6 +37,8 @@ public class BulkImportReport { @JsonProperty private Map> eventsMap; + transient private final Lock reportLock = new ReentrantLock(true); + public BulkImportReport(String provenance, String reportLocation, String reportID) { this.provenance = provenance; @@ -47,13 +52,16 @@ public class BulkImportReport { } /** - * Synchronize it to avoid concurrency issues when concurrent calls are made to the same bulkImport-Report object. + * Synchronize it with a lock, to avoid concurrency issues when concurrent calls are made to the same bulkImport-Report object. * */ - public synchronized String getJsonReport() + public String getJsonReport() { + reportLock.lock(); //Convert the LinkedHashMultiMap to Map>, since Gson cannot serialize Multimaps. - eventsMap = eventsMultimap.asMap(); - return gson.toJson(this, BulkImportReport.class); + eventsMap = new HashMap<>(eventsMultimap.asMap()); // Make sure we use a clone of the original data, in order to avoid any exception in the "gson.toJson()" method, when at the same time another thread modifies the "eventsMultimap". + String reportToReturn = gson.toJson(this, BulkImportReport.class); + reportLock.unlock(); + return reportToReturn; } public String getProvenance() { diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 50e1167..4245f98 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -564,7 +564,7 @@ public class UrlsServiceImpl implements UrlsService { Throwable cause = e.getCause(); String exMsg; if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") ) { - logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\"!"); + logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\" message!"); workerInfo.setHasShutdown(true); // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future. } else logger.error(errorMsg, e); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 5afc57b..f483c81 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -462,7 +462,7 @@ public class FileUtils { if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure. fileAccessLock.lock(); - // TODO - Make this method to be synchronized be specific file, not in general. + // TODO - Make this method to be synchronized for the specific file, not in general. // TODO - NOW: Multiple bulkImport procedures (with diff DIRs), are blocked while writing to DIFFERENT files.. try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), halfMb) )