Compare commits

...

7 Commits

Author SHA1 Message Date
Lampros Smyrnaios edf064616a Improve error-handling in "BulkImportReport.getJsonReport()" and "FileUtils.writeToFile()". 2024-05-30 11:52:04 +03:00
Lampros Smyrnaios d7697ef3f8 - Tighten the thread-safety protection on the "BulkImportReport.getJsonReport()" method.
- Update dependencies.
- Code polishing.
2024-05-27 10:40:05 +03:00
Lampros Smyrnaios b6ad2af48b - Reduce occupying space at any given time, by deleting the archives right after decompression and files-extraction.
- Code refactoring.
2024-05-22 12:11:39 +03:00
Lampros Smyrnaios e2e7ca72d5 - Fix not allowing the user to use the "shutdownAllWorkersGracefully" endpoint twice.
- Code optimization.
- Update dependencies.
2024-05-21 23:43:49 +03:00
Lampros Smyrnaios 39c36f9e66 - Resolve a concurrency issue, by enforcing synchronization on the "BulkImportReport.getJsonReport()" method.
- Increase the number of stacktrace-lines to 20, for bulkImport-segment-failures.
- Improve "GenericUtils.getSelectiveStackTrace()".
2024-05-01 01:29:25 +03:00
Lampros Smyrnaios 8e14d4dbe0 - Fix not counting the files from the bulkImport-segment, which failed due to an exception.
- Write segment-exception-messages to the bulkImport-report.
2024-04-30 23:43:52 +03:00
Lampros Smyrnaios 0d117743c2 - Code-optimization.
- Upload the updated gradle-wrapper and set using the latest Gradle version in "installAndRun.sh" script.
2024-04-30 02:13:08 +03:00
11 changed files with 529 additions and 400 deletions

View File

@ -1,12 +1,12 @@
plugins {
id 'org.springframework.boot' version '2.7.18'
id 'io.spring.dependency-management' version '1.1.4'
id 'io.spring.dependency-management' version '1.1.5'
id 'java'
}
java {
group = 'eu.openaire.urls_controller'
version = '2.7.0-SNAPSHOT'
version = '2.7.3-SNAPSHOT'
sourceCompatibility = JavaVersion.VERSION_1_8
}
@ -43,18 +43,18 @@ dependencies {
//implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2'
// https://mvnrepository.com/artifact/com.google.guava/guava
implementation group: 'com.google.guava', name: 'guava', version: '33.1.0-jre'
implementation group: 'com.google.guava', name: 'guava', version: '33.2.0-jre'
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0'
// https://mvnrepository.com/artifact/org.apache.commons/commons-compress
implementation("org.apache.commons:commons-compress:1.26.1") {
implementation("org.apache.commons:commons-compress:1.26.2") {
exclude group: 'com.github.luben', module: 'zstd-jni'
}
implementation 'com.github.luben:zstd-jni:1.5.6-3' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often.
implementation 'io.minio:minio:8.5.9'
implementation 'io.minio:minio:8.5.10'
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation("com.cloudera.impala:jdbc:2.5.31") {
@ -77,7 +77,7 @@ dependencies {
}
// https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro
implementation('org.apache.parquet:parquet-avro:1.13.1')
implementation('org.apache.parquet:parquet-avro:1.14.0')
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common
implementation("org.apache.hadoop:hadoop-common:$hadoopVersion") {
@ -116,10 +116,10 @@ dependencies {
implementation 'org.json:json:20240303' // This is used only in "ParquetFileUtils.createRemoteParquetDirectories()". TODO - Replace it with "gson".
// https://mvnrepository.com/artifact/com.google.code.gson/gson
implementation 'com.google.code.gson:gson:2.10.1'
implementation 'com.google.code.gson:gson:2.11.0'
// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.12.5'
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.13.0'
testImplementation 'org.springframework.security:spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test"

Binary file not shown.

View File

@ -26,7 +26,7 @@ if [[ justRun -eq 1 && shouldRunInDocker -eq 1 ]]; then
justRun=0
fi
gradleVersion="8.6"
gradleVersion="8.7"
if [[ justRun -eq 0 ]]; then

View File

@ -153,7 +153,11 @@ public class ShutdownController {
logger.info(initMsg);
workerInfo.setHasShutdown(true); // This will update the map.
UrlsController.numOfActiveWorkers.decrementAndGet();
int numActiveWorkers = UrlsController.numOfActiveWorkers.decrementAndGet();
if ( (numActiveWorkers == 0) && shouldShutdownAllWorkers ) { // If all workers have shutdown and the "shouldShutdownAllWorkers" was set, then reset the indicator to allow for future shutdowns.
logger.info("All workers have shutdown.");
shouldShutdownAllWorkers = false; // Make sure we can request that all the workers will shut-down again, when the user starts a couple of workers afterwards and sometime in the future he wants to shut them down.
}
// Return "HTTP-OK" to this worker. If this was part of a shutdown-service request, then wait for the scheduler to check and shutdown the service.
return ResponseEntity.ok().build();

View File

@ -7,14 +7,21 @@ import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class BulkImportReport {
private static final Logger logger = LoggerFactory.getLogger(BulkImportReport.class);
private static final Gson gson = new Gson(); // This is "transient" by default. It won't be included in any json object.
@JsonProperty
@ -34,6 +41,8 @@ public class BulkImportReport {
@JsonProperty
private Map<String, Collection<String>> eventsMap;
transient private final Lock reportLock = new ReentrantLock(true);
public BulkImportReport(String provenance, String reportLocation, String reportID) {
this.provenance = provenance;
@ -46,11 +55,23 @@ public class BulkImportReport {
eventsMultimap.put(GenericUtils.getReadableCurrentTimeAndZone(), event); // This is synchronized.
}
/**
* Synchronize it with a lock, to avoid concurrency issues when concurrent calls are made to the same bulkImport-Report object.
* */
public String getJsonReport()
{
//Convert the LinkedHashMultiMap<String, String> to Map<String, Collection<String>>, since Gson cannot serialize Multimaps.
eventsMap = eventsMultimap.asMap();
return gson.toJson(this, BulkImportReport.class);
String reportToReturn = null;
reportLock.lock();
try {
//Convert the LinkedHashMultiMap<String, String> to Map<String, Collection<String>>, since Gson cannot serialize Multimaps.
eventsMap = new HashMap<>(eventsMultimap.asMap()); // Make sure we use a clone of the original data, in order to avoid any exception in the "gson.toJson()" method, when at the same time another thread modifies the "eventsMultimap".
reportToReturn = gson.toJson(this, BulkImportReport.class);
} catch (Exception e) {
logger.error("Problem when producing the JSON-string with the BulkImportReport! | reportID: '" + reportID + "'", e);
} finally {
reportLock.unlock();
}
return reportToReturn; // It may be null.
}
public String getProvenance() {

View File

@ -156,13 +156,19 @@ public class BulkImportServiceImpl implements BulkImportService {
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP).
numFailedSegments ++;
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Task_" + i + " failed with: ", additionalLoggingMsg, 15));
numAllFailedFiles += subLists.get(i).size(); // We assume all files of this segment failed, as all are passed through the same parts of code, so any serious exception should arise from the 1st files being processed and the rest of the files wil be skipped..
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Task_" + i + " failed with: ", additionalLoggingMsg, 20));
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ee.getCause().getMessage());
} catch (CancellationException ce) {
numFailedSegments ++;
numAllFailedFiles += subLists.get(i).size(); // We assume all files have failed.
logger.error("Task_" + i + " was cancelled: " + ce.getMessage() + additionalLoggingMsg);
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ce.getMessage());
} catch (InterruptedException ie) {
numFailedSegments ++;
numAllFailedFiles += (subLists.get(i).size() / 3); // In this case, only some of the files will have failed (do not know how many).
logger.error("Task_" + i + " was interrupted: " + ie.getMessage());
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ie.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage() + additionalLoggingMsg);
}

View File

@ -6,6 +6,7 @@ import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.FilesHandler;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import io.micrometer.core.annotation.Timed;
@ -49,6 +50,9 @@ public class UrlsServiceImpl implements UrlsService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private FilesHandler filesHandler;
@Autowired
private FileUtils fileUtils;
@ -277,7 +281,7 @@ public class UrlsServiceImpl implements UrlsService {
boolean hasFulltexts = true;
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId);
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = filesHandler.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId);
if ( uploadFullTextsResponse == null ) {
// Nothing to post to the Worker, since we do not have the worker's info.
// Rename the worker-report-file to indicate its "failure", so that the scheduler can pick it up and retry processing it.
@ -287,7 +291,7 @@ public class UrlsServiceImpl implements UrlsService {
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils.removeUnretrievedFullTextsFromUrlReports(urlReports, false);
filesHandler.removeUnretrievedFullTextsFromUrlReports(urlReports, false);
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
@ -560,7 +564,7 @@ public class UrlsServiceImpl implements UrlsService {
Throwable cause = e.getCause();
String exMsg;
if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") ) {
logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\"!");
logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\" message!");
workerInfo.setHasShutdown(true); // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future.
} else
logger.error(errorMsg, e);

View File

@ -1,29 +1,64 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.models.Payload;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
@Component
public class FileDecompressor {
private static final Logger logger = LoggerFactory.getLogger(FileDecompressor.class);
@Autowired
private FileUtils fileUtils;
boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List<String> fileNamesForCurBatch, int batchCounter, SetMultimap<String, Payload> allFileNamesWithPayloads, long assignmentsBatchCounter)
{
try {
decompressFiles(zstdFileFullPath, curBatchPath);
String[] extractedFileNames = new File(targetDirectory).list();
if ( extractedFileNames == null ) {
logger.error("There was an error when acquiring the list of extracted files of directory: " + targetDirectory);
return false;
} else if ( extractedFileNames.length == 0 ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred..
logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory);
return false;
} else if ( extractedFileNames.length != fileNamesForCurBatch.size() ) {
logger.warn("The number of extracted files (" + extractedFileNames.length + ") was not equal to the number of files (" + fileNamesForCurBatch.size() + ") of the current batch_" + batchCounter);
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
}
fileUtils.uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads, batchCounter);
return true;
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
return false;
} finally {
fileUtils.deleteDirectory(curBatchPath.toFile());
}
}
public void decompressFiles(String zstdSource, Path targetDir) throws Exception
{
// Decompress the zstd file.
Path tarPath = Paths.get(StringUtils.replace(zstdSource, ".zstd", "", 1)); // Remove the ".zstd" extension.
String tarPathStr = StringUtils.replace(zstdSource, ".zstd", "", 1); // Remove the ".zstd" extension.
Path tarPath = Paths.get(tarPathStr);
int readByte = -1;
try ( ZstdCompressorInputStream zsIn = new ZstdCompressorInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(zstdSource)), FileUtils.tenMb));
@ -31,6 +66,8 @@ public class FileDecompressor {
{
while ( (readByte = zsIn.read()) != -1 )
out.write(readByte);
} finally {
fileUtils.deleteFile(zstdSource); // Delete the initial zstd file.
}
// Now we have a decompressed tar-file, which we will Un-tar, in order to extract the full-text files.
@ -46,6 +83,8 @@ public class FileDecompressor {
} // The exception will be given to the caller method.
// No need to close the tarEntry (no "close()" method is provided).
}
} finally {
fileUtils.deleteFile(tarPathStr); // Delete the decompressed tar file.
}
// Now we have a batch-directory which contains the tar-file along with the extracted full-text files.

View File

@ -3,11 +3,9 @@ package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import org.apache.commons.io.FileDeleteStrategy;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@ -21,15 +19,15 @@ import org.springframework.stereotype.Component;
import java.io.*;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.text.DecimalFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
@ -50,221 +48,29 @@ public class FileUtils {
@Autowired
private S3ObjectStore s3ObjectStore;
@Autowired
private FileDecompressor fileDecompressor;
@Value("${services.pdfaggregation.worker.port}")
private String workerPort;
public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful}
public String baseFilesLocation;
public static final String workingDir = System.getProperty("user.dir") + File.separator;
private final boolean isTestEnvironment;
public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation, @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) {
if ( !baseFilesLocation.endsWith(File.separator) )
baseFilesLocation += File.separator;
if ( !baseFilesLocation.startsWith(File.separator) )
baseFilesLocation = workingDir + baseFilesLocation;
this.baseFilesLocation = baseFilesLocation;
public FileUtils (@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) {
this.isTestEnvironment = isTestEnvironment;
}
public static final DecimalFormat df = new DecimalFormat("0.00");
// The following regex might be useful in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
// Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf"
public static final Pattern FILEPATH_ID_EXTENSION = Pattern.compile("([^.()]+/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$");
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static final ExecutorService hashMatchingExecutor = Executors.newFixedThreadPool(6);
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, int sizeOfUrlReports, long assignmentsBatchCounter, String workerId) throws RuntimeException
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
String workerIp = null;
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not retrieve the info of worker: " + workerId);
return null;
} else
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
int numValidFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
int numFullTextsWithProblematicLocations = 0;
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
HashMultimap<String, Payload> hashesWithPayloads = getHashesWithPayloads(urlReports, sizeOfUrlReports); // Holds multiple payloads for the same fileHash.
Set<String> fileHashes = hashesWithPayloads.keySet();
int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap.
if ( fileHashesSetSize == 0 ) {
logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\".");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
HashMap<String, String> hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments");
for ( String fileHash : fileHashes )
{
for ( Payload payload : hashesWithPayloads.get(fileHash) )
{
String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash.
if ( alreadyFoundFileLocation != null ) {
// Fill the payloads with locations from the "previously-found-hashes."
payload.setLocation(alreadyFoundFileLocation);
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numValidFullTextsFound ++; // We trust the location being valid..
}
else { // This file has not been found before..
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
String fileLocation = payload.getLocation();
Matcher matcher = FILEPATH_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
numValidFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
}
if ( numFullTextsWithProblematicLocations > 0 )
logger.warn(numFullTextsWithProblematicLocations + " files had problematic names.");
if ( numValidFullTextsFound == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
int numFullTextsToBeRequested = allFileNames.size();
if ( numFullTextsToBeRequested == 0 ) {
logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%).");
// TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
// Request the full-texts in batches, compressed in a zstd tar file.
int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// TODO - The worker should send the port in which it accepts requests, along with the current request.
// TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file.
String curAssignmentsBaseLocation = baseFilesLocation + "assignments_" + assignmentsBatchCounter + File.separator;
// Note: the "curAssignmentsBaseLocation"-directory will be created once the first batch subdirectory is called for creation.
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
// TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH
// Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH
// So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch.
// TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other.
// TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker,
// Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd.
// So, the worker will never be left waiting and the Controller will not wait for the Worker either..!
// The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel.
// Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker.
// TODO - BUT, we can make the new thread "WAIT" for the previous to finish.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
Path curBatchPath;
try { // Create this batch-directory.
curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// The base-directory will be created along with the first batch-directory.
} catch (Exception e) {
logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++;
continue;
}
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
failedBatches ++;
continue;
}
} catch (RuntimeException re) {
failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded.
break;
}
if ( ! decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) )
failedBatches ++;
} // End of batches.
if ( failedBatches == numOfBatches )
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
deleteDirectory(new File(curAssignmentsBaseLocation));
// Check and warn about the number of failed payloads.
// Possible reasons: failed to check their hash in the DB, the file was not found inside the worker, whole batch failed to be delivered from the worker, files failed t be uploaded to S3
// Retrieve the payloads from the existing urlReports.
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations);
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.
logger.error("None of the " + numInitialPayloads + " payloads could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.unsuccessful;
} else if ( numFailedPayloads > 0 )
logger.warn(numFailedPayloads + " payloads (out of " + numInitialPayloads + ") failed to be processed for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.successful;
}
public HashMultimap<String, Payload> getHashesWithPayloads(List<UrlReport> urlReports, int sizeOfUrlReports)
{
HashMultimap<String, Payload> hashesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple payloads for the same fileHash.
@ -335,113 +141,20 @@ public class FileUtils {
}
private boolean getAndSaveFullTextBatch(List<String> fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches,
String zstdFileFullPath, String workerId) throws RuntimeException
{
HttpURLConnection conn;
try {
if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null )
return false;
} catch (RuntimeException re) {
// The "cause" was logged inside "getConnection()".
throw re;
}
// Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url.
return saveArchive(conn, new File(zstdFileFullPath));
// We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future.
}
private boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List<String> fileNamesForCurBatch, int batchCounter,
SetMultimap<String, Payload> allFileNamesWithPayloads, long assignmentsBatchCounter)
{
try {
fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath);
String[] extractedFileNames = new File(targetDirectory).list();
if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred..
logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory);
return false;
} else if ( (extractedFileNames.length - 2) != fileNamesForCurBatch.size() ) {
logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of files (" + fileNamesForCurBatch.size() + ") of the current batch_" + batchCounter);
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
}
uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads, batchCounter);
return true;
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
return false;
} finally {
deleteDirectory(curBatchPath.toFile());
}
}
private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException
{
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
//logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
try {
HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController");
// TODO - Write the fileNames in the RequestBody, so that we can include as many as we want in each request.
// Right now, we can include only up to 70-80 fileNames in the url-string.
// TODO - We need to add the fileNames in the requestBody BEFORE we connect. So we will need to refactor the code to work in that order.
/*OutputStream os = conn.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8);
osw.write(fileNamesForCurBatch_separated_by_comma);
osw.flush();
osw.close();
os.close();*/
// TODO - The above update will also enable is to improve edge-case management like making sure we do not create a whole new batch for just a few files..
// Check this case for example, where we have one extra batch with the network, compression-decompression, transfer, uploading ect overhead:
// 2023-02-06 12:17:26.893 [http-nio-1880-exec-8] DEBUG e.o.urls_controller.util.FileUtils.getAndUploadFullTexts(@235) - The assignments_12 have 211 distinct non-already-uploaded fullTexts.
// Going to request them from the Worker "worker_X", in 4 batches (70 files each, except for the final batch, which will have 1 files).
// If we are not limited by the url-length we can easily say that if less than 10 files remain for the last batch, then add them to the previous batch (eg. the last batch will have 79 files)
// If equal to 10 or more files remain, then we will make an extra batch.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) { // Invalid HTTP-Response.
logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl);
throw new RuntimeException(); // Avoid any other batches.
} else if ( statusCode != 200 ) {
String errMsg = getMessageFromResponseBody(conn, true);
logger.warn("HTTP-" + statusCode + ": " + errMsg + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
if ( ((statusCode >= 500) && (statusCode <= 599))
|| ((statusCode == 400) && ((errMsg != null) && errMsg.contains("The base directory for assignments_" + assignmentsBatchCounter + " was not found"))) )
throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well.
return null;
} else
return conn;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
String exMessage = e.getMessage();
logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + GenericUtils.endOfLine + exMessage);
if ( exMessage.contains("Connection refused") ) {
logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!");
throw new RuntimeException();
}
return null;
}
}
private void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap<String, Payload> allFileNamesWithPayloads, int batchCounter)
void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap<String, Payload> allFileNamesWithPayloads, int batchCounter)
{
// Iterate over the files and upload them to S3.
// TODO - Make the uploads run in parallel, using at most 4 threads.
// There might also be uploads from other assignment-batches (from other workers) running at the same time.
// But still, there are currently at most 3 (?) upload operations, ast the same time, each using 1 thread.
// It's most common to have just 1 or 2 of them. So each can easily use 2 or 4 threads to upload its own files.
//int numUploadedFiles = 0;
for ( String fileName : fileNames )
{
if ( fileName.contains(".tar") ) // Exclude the tar-files from uploading (".tar" and ".tar.zstd").
// Check if any of the ".tar" files (".tar" and ".tar.zstd") are still in the directory (in case their deletion failed) and exclude them from being uploaded.
if ( fileName.contains(".tar") )
continue;
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
@ -516,8 +229,7 @@ public class FileUtils {
// Else, the record will have its file-data set to "null", in the end of the caller method (as it will not have an s3Url as its location).
}//end filenames-for-loop
//logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -2) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
// (fileNames.length -2) --> minus the zstd and the tar files.
//logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + fileNames.length + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
}
@ -583,46 +295,13 @@ public class FileUtils {
}
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts;
final List<String> fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
for ( int i = initialIndex; i < endingIndex; ++i ) {
try {
fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + GenericUtils.endOfLine + ioobe.getMessage(), ioobe);
}
}
return fileNamesOfCurBatch;
}
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
sb.append(fileNamesForCurBatch.get(j));
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
return sb.toString();
}
public static final int twentyFiveKb = 25_600; // 25 Kb
public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb = 524_288 bytes
public static final int tenMb = (10 * 1_048_576);
public boolean saveArchive(HttpURLConnection conn, File zstdFile)
public boolean saveArchive(InputStream inputStream, File zstdFile)
{
try ( BufferedInputStream inStream = new BufferedInputStream(conn.getInputStream(), tenMb);
try ( BufferedInputStream inStream = new BufferedInputStream(inputStream, tenMb);
BufferedOutputStream outStream = new BufferedOutputStream(Files.newOutputStream(zstdFile.toPath()), tenMb) )
{
int readBytes;
@ -637,36 +316,6 @@ public class FileUtils {
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted.
* If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded.
* @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles
*/
public void removeUnretrievedFullTextsFromUrlReports(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
if ( shouldCheckAndKeepS3UploadedFiles ) {
String fileLocation = payload.getLocation();
if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) )
continue;
}
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now.
payload.setLocation(null); // This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted.
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
// The id-url record will be called as a new assignment in the future.
}
}
/**
* This method searches the backlog of publications for the ones that have the same "original_url" or their "original_url" is equal to the "actual_url" or an existing payload.
* Then, it generates a new "UrlReport" object, which has as a payload a previously generated one, which has different "id", "original_url".
@ -724,19 +373,14 @@ public class FileUtils {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
return; // Move to the next payload.
}
Set<Payload> foundPayloads = urlToPayloadsMultimap.get(original_url);
if ( foundPayloads == null ) {
Set<Payload> foundPayloads = urlToPayloadsMultimap.get(original_url); // It doesn't return null, on error, but an empty set.
if ( foundPayloads.size() == 0 ) {
logger.error("No payloads associated with \"original_url\" = \"" + original_url + "\"!");
return;
}
// Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random).
Optional<Payload> optPayload = foundPayloads.stream().findFirst();
if ( !optPayload.isPresent() ) {
logger.error("Could not retrieve any payload for the \"original_url\": " + original_url);
return; // Move to the next payload.
}
Payload prefilledPayload = optPayload.get().clone(); // We take a clone of the original payload and change just the id and the original_url.
// Since the "foundPayloads" is not-empty at this point, the, we do not have to check if the first element is present.
Payload prefilledPayload = foundPayloads.stream().findFirst().get().clone(); // We take a clone of the original payload and change just the id and the original_url.
prefilledPayload.setId(id);
prefilledPayload.setOriginal_url(original_url);
prefilledPayloads.add(prefilledPayload);
@ -815,9 +459,15 @@ public class FileUtils {
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
{
if ( stringToWrite == null )
return "The string to write to file '" + fileFullPath + "' is null!";
if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure.
fileAccessLock.lock();
// TODO - Make this method to be synchronized for the specific file, not in general.
// TODO - NOW: Multiple bulkImport procedures (with diff DIRs), are blocked while writing to DIFFERENT files..
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), halfMb) )
{
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.

View File

@ -0,0 +1,403 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
@Component
public class FilesHandler {
private static final Logger logger = LoggerFactory.getLogger(FilesHandler.class);
@Autowired
private FileUtils fileUtils;
@Autowired
private FileDecompressor fileDecompressor;
@Autowired
private S3ObjectStore s3ObjectStore;
@Value("${services.pdfaggregation.worker.port}")
private String workerPort;
public static final DecimalFormat df = new DecimalFormat("0.00");
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public String baseFilesLocation;
public FilesHandler (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation) {
if ( !baseFilesLocation.endsWith(File.separator) )
baseFilesLocation += File.separator;
if ( !baseFilesLocation.startsWith(File.separator) )
baseFilesLocation = FileUtils.workingDir + baseFilesLocation;
this.baseFilesLocation = baseFilesLocation;
}
private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException
{
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
//logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
try {
HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController");
// TODO - Write the fileNames in the RequestBody, so that we can include as many as we want in each request.
// Right now, we can include only up to 70-80 fileNames in the url-string.
// TODO - We need to add the fileNames in the requestBody BEFORE we connect. So we will need to refactor the code to work in that order.
/*OutputStream os = conn.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8);
osw.write(fileNamesForCurBatch_separated_by_comma);
osw.flush();
osw.close();
os.close();*/
// TODO - The above update will also enable is to improve edge-case management like making sure we do not create a whole new batch for just a few files..
// Check this case for example, where we have one extra batch with the network, compression-decompression, transfer, uploading ect overhead:
// 2023-02-06 12:17:26.893 [http-nio-1880-exec-8] DEBUG e.o.urls_controller.util.FileUtils.getAndUploadFullTexts(@235) - The assignments_12 have 211 distinct non-already-uploaded fullTexts.
// Going to request them from the Worker "worker_X", in 4 batches (70 files each, except for the final batch, which will have 1 files).
// If we are not limited by the url-length we can easily say that if less than 10 files remain for the last batch, then add them to the previous batch (eg. the last batch will have 79 files)
// If equal to 10 or more files remain, then we will make an extra batch.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) { // Invalid HTTP-Response.
logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl);
throw new RuntimeException(); // Avoid any other batches.
} else if ( statusCode != 200 ) {
String errMsg = fileUtils.getMessageFromResponseBody(conn, true);
logger.warn("HTTP-" + statusCode + ": " + errMsg + "\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
if ( ((statusCode >= 500) && (statusCode <= 599))
|| ((statusCode == 400) && ((errMsg != null) && errMsg.contains("The base directory for assignments_" + assignmentsBatchCounter + " was not found"))) )
throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well.
return null;
} else
return conn;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
String exMessage = e.getMessage();
logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + GenericUtils.endOfLine + exMessage);
if ( exMessage.contains("Connection refused") ) {
logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!");
throw new RuntimeException();
}
return null;
}
}
public FileUtils.UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, int sizeOfUrlReports, long assignmentsBatchCounter, String workerId) throws RuntimeException
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
String workerIp = null;
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not retrieve the info of worker: " + workerId);
return null;
} else
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
int numValidFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
int numFullTextsWithProblematicLocations = 0;
HashMultimap<String, Payload> hashesWithPayloads = fileUtils.getHashesWithPayloads(urlReports, sizeOfUrlReports); // Holds multiple payloads for the same fileHash.
Set<String> fileHashes = hashesWithPayloads.keySet();
int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap.
if ( fileHashesSetSize == 0 ) {
logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\".");
return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
HashMap<String, String> hashLocationMap = fileUtils.getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments");
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( String fileHash : fileHashes )
{
for ( Payload payload : hashesWithPayloads.get(fileHash) )
{
String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash.
if ( alreadyFoundFileLocation != null ) {
// Fill the payloads with locations from the "previously-found-hashes."
payload.setLocation(alreadyFoundFileLocation);
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numValidFullTextsFound ++; // We trust the location being valid..
}
else { // This file has not been found before..
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
String fileLocation = payload.getLocation();
Matcher matcher = FileUtils.FILEPATH_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FileUtils.FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FileUtils.FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
numValidFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
}
if ( numFullTextsWithProblematicLocations > 0 )
logger.warn(numFullTextsWithProblematicLocations + " files had problematic names.");
if ( numValidFullTextsFound == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
int numFullTextsToBeRequested = allFileNames.size();
if ( numFullTextsToBeRequested == 0 ) {
logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%).");
// TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
// Request the full-texts in batches, compressed in a zstd tar file.
int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// TODO - The worker should send the port in which it accepts requests, along with the current request.
String curAssignmentsBaseLocation = baseFilesLocation + "assignments_" + assignmentsBatchCounter + File.separator;
// Note: the "curAssignmentsBaseLocation"-directory will be created once the first batch subdirectory is called for creation.
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
// TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH
// Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH
// So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch.
// TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other.
// TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker,
// Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd.
// So, the worker will never be left waiting and the Controller will not wait for the Worker either..!
// The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel.
// Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker.
// TODO - BUT, we can make the new thread "WAIT" for the previous to finish.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
Path curBatchPath;
try { // Create this batch-directory.
curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// The base-directory will be created along with the first batch-directory.
} catch (Exception e) {
logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++;
continue;
}
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
failedBatches ++;
continue;
}
} catch (RuntimeException re) {
failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded.
break;
}
if ( ! fileDecompressor.decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) )
failedBatches ++;
} // End of batches.
if ( failedBatches == numOfBatches )
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
fileUtils.deleteDirectory(new File(curAssignmentsBaseLocation));
// Check and warn about the number of failed payloads.
// Possible reasons: failed to check their hash in the DB, the file was not found inside the worker, whole batch failed to be delivered from the worker, files failed t be uploaded to S3
// Retrieve the payloads from the existing urlReports.
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations);
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.
logger.error("None of the " + numInitialPayloads + " payloads could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return FileUtils.UploadFullTextsResponse.unsuccessful;
} else if ( numFailedPayloads > 0 )
logger.warn(numFailedPayloads + " payloads (out of " + numInitialPayloads + ") failed to be processed for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return FileUtils.UploadFullTextsResponse.successful;
}
private boolean getAndSaveFullTextBatch(List<String> fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches,
String zstdFileFullPath, String workerId) throws RuntimeException
{
HttpURLConnection conn;
InputStream inputStream;
try {
if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null )
return false;
inputStream = conn.getInputStream();
} catch (RuntimeException re) {
// The "cause" was logged inside "getConnection()".
throw re;
} catch (IOException ioe) {
throw new RuntimeException(ioe.getMessage());
}
// Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url.
return fileUtils.saveArchive(inputStream, new File(zstdFileFullPath));
// We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future.
//private static final RestTemplate restTemplate = new RestTemplate();
//baseUrl += batchCounter + "/";
//String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
// Define a response extractor that returns an input stream
//ResponseExtractor<InputStream> responseExtractor = HttpInputMessage::getBody;
/*ResponseExtractor<InputStream> responseExtractor = new ResponseExtractor<InputStream>() {
@Override
public InputStream extractData(ClientHttpResponse response) throws IOException {
return response.getBody();
}
};
try {
//ResponseEntity<Object> responseEntity = restTemplate.exchange(requestUrl, HttpMethod.GET, null, Object.class);
InputStream inputStream = restTemplate.exchange(requestUrl, HttpMethod.GET, null, responseExtractor);
//InputStream inputStream = (InputStream) responseEntity.getBody();
return saveArchive(inputStream, new File(zstdFileFullPath));
} catch (Exception e) {
logger.error("Could not get the file from the response body!", e);
return false;
}*/
}
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
sb.append(fileNamesForCurBatch.get(j));
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
return sb.toString();
}
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts;
final List<String> fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
for ( int i = initialIndex; i < endingIndex; ++i ) {
try {
fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + GenericUtils.endOfLine + ioobe.getMessage(), ioobe);
}
}
return fileNamesOfCurBatch;
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted.
* If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded.
* @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles
*/
public void removeUnretrievedFullTextsFromUrlReports(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
if ( shouldCheckAndKeepS3UploadedFiles ) {
String fileLocation = payload.getLocation();
if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) )
continue;
}
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now.
payload.setLocation(null); // This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted.
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
// The id-url record will be called as a new assignment in the future.
}
}
}

View File

@ -13,6 +13,7 @@ public class GenericUtils {
public static final String endOfLine = "\n";
public static final String tab = "\t";
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z");
@ -44,12 +45,13 @@ public class GenericUtils {
public static String getSelectiveStackTrace(Throwable thr, String initialMessage, int numOfLines)
{
StackTraceElement[] stels = thr.getStackTrace();
StringBuilder sb = new StringBuilder(numOfLines *100);
StringBuilder sb = new StringBuilder(numOfLines *100); // This StringBuilder is thread-safe as a local-variable.
if ( initialMessage != null )
sb.append(initialMessage).append(GenericUtils.endOfLine).append("Stacktrace:").append(GenericUtils.endOfLine); // This StringBuilder is thread-safe as a local-variable.
for ( int i = 0; (i < stels.length) && (i <= numOfLines); ++i ) {
sb.append(stels[i]);
if (i < numOfLines) sb.append(GenericUtils.endOfLine);
sb.append(initialMessage).append(endOfLine);
sb.append("Stacktrace:").append(endOfLine);
for ( int i = 0; (i < stels.length) && (i < numOfLines); ++i ) {
sb.append(tab).append(stels[i]);
if (i < (numOfLines -1)) sb.append(endOfLine);
}
return sb.toString();
}