diff --git a/README.md b/README.md
index 8003d22..3aa699b 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,10 @@ Then it receives the "WorkerReports" and writes them into the database.
The database used is the [Impala](https://impala.apache.org/) .
[...]
-To install and run the application, run ```git clone``` and then execute the ```installAndRun.sh``` script.
+To install and run the application, run ```git clone```.
+Then, provide a file *"S3_minIO_credentials.txt"*, inside the *working directory*.
+In the *"S3_minIO_credentials.txt"* file, you should provide the *endpoint*, the *accessKey*, the *secretKey*, the *region* and the *bucket*, in that order, separated by comma.
+
+Afterwards, execute the ```installAndRun.sh``` script.
If you want to just run the app, then run the script with the argument "1": ```./installAndRun.sh 1```.
diff --git a/build.gradle b/build.gradle
index 5757916..1f9aa0f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -43,6 +43,11 @@ dependencies {
// https://mvnrepository.com/artifact/com.google.guava/guava
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
+ implementation 'io.minio:minio:8.3.3'
+
+ // https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp
+ implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' // This is required by the minio, as Spring uses a version which is not supported by minio.
+
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation group: 'com.cloudera.impala', name: 'jdbc', version: '2.5.31'
diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java
index 9dbc78e..e850acf 100644
--- a/src/main/java/eu/openaire/urls_controller/Application.java
+++ b/src/main/java/eu/openaire/urls_controller/Application.java
@@ -2,6 +2,7 @@ package eu.openaire.urls_controller;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
+import eu.openaire.urls_controller.util.S3ObjectStoreMinIO;
import eu.openaire.urls_controller.util.UriBuilder;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
@@ -22,6 +23,7 @@ import java.util.Collections;
public class Application {
public static void main(String[] args) {
+ new S3ObjectStoreMinIO();
SpringApplication.run(Application.class, args);
}
diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java
index 2e02835..3a6b0ab 100644
--- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java
+++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java
@@ -13,6 +13,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
+import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.sql.Date;
@@ -145,11 +146,7 @@ public class UrlController {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
- try {
- con.close();
- } catch (SQLException sqle2) {
- logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
- }
+ ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
@@ -164,11 +161,7 @@ public class UrlController {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
- try {
- con.close();
- } catch (SQLException sqle2) {
- logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
- }
+ ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
@@ -240,7 +233,7 @@ public class UrlController {
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
- String mergeErrorMsg = mergeParquetFiles("assignment", con);
+ String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
@@ -265,7 +258,7 @@ public class UrlController {
@PostMapping("addWorkerReport")
- public ResponseEntity> addWorkerReport(@RequestBody WorkerReport workerReport) {
+ public ResponseEntity> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
@@ -280,8 +273,14 @@ public class UrlController {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
- logger.info("Received the WorkerReport for batch_ " + workerReport.getAssignmentRequestCounter() + ", from the worker with id: " + workerReport.getWorkerId() + ". It contains " + urlReports.size() + " urlReports. Going to insert them into the database.");
+ logger.info("Received the WorkerReport for batch-assignments_" + workerReport.getAssignmentRequestCounter() + ", from the worker with id: " + workerReport.getWorkerId() + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
+ // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
+ if ( ! FileUtils.getAndUploadFullTexts(urlReports, request, assignmentsBatchCounter, workerReport.getWorkerId()) ) {
+ logger.error("Failed to get and/or upload the fullTexts for assignments_" + assignmentsBatchCounter);
+ // The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
+ FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
+ }
ImpalaConnector.databaseLock.lock();
@@ -384,14 +383,14 @@ public class UrlController {
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
- String mergeErrorMsg = mergeParquetFiles("payload", con);
+ String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
- mergeErrorMsg = mergeParquetFiles("attempt", con);
+ mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
@@ -415,50 +414,6 @@ public class UrlController {
}
- /**
- * In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
- * This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
- * Renames the clone to the original's name.
- * Returns the errorMsg, if an error appears, otherwise is returns "null".
- * */
- private static String mergeParquetFiles(String tableName, Connection con)
- {
- String errorMsg;
- if ( tableName == null ) {
- errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
- logger.error(errorMsg);
- return errorMsg;
- }
-
- Statement statement;
- try {
- statement = con.createStatement();
- } catch (SQLException sqle) {
- errorMsg = "Problem when creating a connection-statement!\n";
- logger.error(errorMsg + sqle.getMessage());
- return errorMsg;
- }
-
- try {
- statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName);
- statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
- statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
- statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
- } catch (SQLException sqle) {
- errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
- logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()));
- sqle.printStackTrace();
- return errorMsg;
- } finally {
- // Make sure we close the statement.
- try { statement.close(); }
- catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
- }
-
- return null; // No errorMsg, everything is fine.
- }
-
-
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
@@ -489,17 +444,6 @@ public class UrlController {
}
- private static String getCutBatchExceptionMessage(String sqleMessage)
- {
- // The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
- int maxEnding = 1500;
- if ( sqleMessage.length() > maxEnding )
- return (sqleMessage.substring(0, maxEnding) + "...");
- else
- return sqleMessage;
- }
-
-
private boolean closePreparedStatements(PreparedStatement preparedStatement1, PreparedStatement preparedStatement2, Connection con) {
try {
if ( preparedStatement1 != null )
diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java
new file mode 100644
index 0000000..678e7e0
--- /dev/null
+++ b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java
@@ -0,0 +1,61 @@
+package eu.openaire.urls_controller.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+
+public class FileUnZipper {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileUnZipper.class);
+
+
+ public static void unzipFolder(Path source, Path target) throws Exception
+ {
+ try (ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())))
+ {
+ // Iterate over the files in zip and un-zip them.
+ ZipEntry zipEntry = zis.getNextEntry();
+ while ( zipEntry != null )
+ {
+ Path targetPath = zipSlipProtect(zipEntry, target);
+
+ if ( zipEntry.getName().endsWith(File.separator) ) // If we have a directory.
+ Files.createDirectories(targetPath);
+ else {
+ // Some zip stored file path only, need create parent directories, e.g data/folder/file.txt
+ if ( targetPath.getParent() != null ) {
+ if ( Files.notExists(targetPath.getParent()) ) {
+ Files.createDirectories(targetPath.getParent());
+ }
+ }
+ Files.copy(zis, targetPath, StandardCopyOption.REPLACE_EXISTING);
+ }
+ zipEntry = zis.getNextEntry();
+ }
+ zis.closeEntry();
+ }
+ }
+
+
+ // Protect from a Zip Slip attack: https://snyk.io/research/zip-slip-vulnerability
+ public static Path zipSlipProtect(ZipEntry zipEntry, Path targetDir) throws IOException
+ {
+ Path targetDirResolved = targetDir.resolve(zipEntry.getName());
+ // Make sure normalized file still has targetDir as its prefix, else throw an exception.
+ Path normalizePath = targetDirResolved.normalize();
+ if ( !normalizePath.startsWith(targetDir) ) {
+ throw new IOException("Bad zip entry: " + zipEntry.getName());
+ }
+ return normalizePath;
+ }
+
+}
diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java
index 6a6ddbc..02249bc 100644
--- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java
+++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java
@@ -1,18 +1,35 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
+import eu.openaire.urls_controller.configuration.ImpalaConnector;
+import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.Task;
+import eu.openaire.urls_controller.models.UrlReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.configurationprocessor.json.JSONObject;
+import javax.servlet.http.HttpServletRequest;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class FileUtils {
@@ -46,6 +63,394 @@ public class FileUtils {
}
+ /**
+ * In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
+ * This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
+ * Renames the clone to the original's name.
+ * Returns the errorMsg, if an error appears, otherwise is returns "null".
+ * */
+ public static String mergeParquetFiles(String tableName, Connection con)
+ {
+ String errorMsg;
+ if ( tableName == null ) {
+ errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
+ logger.error(errorMsg);
+ return errorMsg;
+ }
+
+ Statement statement;
+ try {
+ statement = con.createStatement();
+ } catch (SQLException sqle) {
+ errorMsg = "Problem when creating a connection-statement!\n";
+ logger.error(errorMsg + sqle.getMessage());
+ return errorMsg;
+ }
+
+ try {
+ statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName);
+ statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
+ statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
+ statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
+ } catch (SQLException sqle) {
+ errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
+ logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()), sqle);
+ return errorMsg;
+ } finally {
+ // Make sure we close the statement.
+ try { statement.close(); }
+ catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
+ }
+
+ return null; // No errorMsg, everything is fine.
+ }
+
+
+ private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
+ private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
+ public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
+ private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
+
+ public static boolean getAndUploadFullTexts(List urlReports, HttpServletRequest request, AtomicLong assignmentsBatchCounter, String workerId)
+ {
+ // The Controller have to request the files from the Worker, in order to upload them to the S3.
+ // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
+
+ if ( request == null ) {
+ logger.error("The \"HttpServletRequest\" is null!");
+ return false;
+ }
+ String remoteAddr = request.getHeader("X-FORWARDED-FOR");
+ if ( remoteAddr == null || "".equals(remoteAddr) )
+ remoteAddr = request.getRemoteAddr();
+
+ // Get the file-locations.
+ List allFileNames = new ArrayList<>(urlReports.size()/2);
+ for ( UrlReport urlReport : urlReports ) {
+ UrlReport.StatusType statusType = urlReport.getStatus();
+ if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) {
+ continue;
+ }
+ Payload payload = urlReport.getPayload();
+ if ( payload != null ) {
+ String fileLocation = payload.getLocation();
+ if ( fileLocation != null ) { // If the docFile was downloaded (without an error)..
+ Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
+ if ( !matcher.matches() ) {
+ continue;
+ }
+ String fileNameWithExtension = matcher.group(1);
+ if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
+ continue;
+ }
+ allFileNames.add(fileNameWithExtension);
+ }
+ }
+ }
+
+ int numAllFullTexts = allFileNames.size();
+ if ( numAllFullTexts == 0 ) {
+ logger.warn("The file retrieved by the Worker where < 0 > for assignments_" + assignmentsBatchCounter);
+ return true; // It was handled, no error.
+ }
+
+ // Request the full-texts in batches, compressed in zip.
+ int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
+ if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
+ numOfBatches ++;
+
+ logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " fullTexts. Going to request them from the Worker, in " + numOfBatches + " batches.");
+
+ // Check if one full text is left out because of the division. Put it int the last batch.
+ String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
+
+ // Index all UrlReports to be more efficiently searched later.
+ HashMap payloadsHashMap = new HashMap<>(urlReports.size());
+ for ( UrlReport urlReport : urlReports ) {
+ Payload payload = urlReport.getPayload();
+ if ( payload != null )
+ payloadsHashMap.put(payload.getId(), payload);
+ }
+
+ String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator;
+ File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation);
+
+ int failedBatches = 0;
+ for ( int i=1; i <= numOfBatches; ++i )
+ {
+ List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, i, numOfBatches);
+ HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, i, fileNamesForCurBatch, numOfBatches, workerId);
+ if ( conn == null ) {
+ updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch);
+ failedBatches ++;
+ continue; // To the next batch.
+ }
+
+ String targetLocation = curAssignmentsBaseLocation + "batch_" + i + File.separator;
+ File curBatchDir = new File(targetLocation);
+ try {
+ // Get the extracted files.,
+ Path targetPath = Files.createDirectories(Paths.get(targetLocation));
+
+ // Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url
+ String zipFileFullPath = targetLocation + "fullTexts_" + assignmentsBatchCounter + "_" + i + ".zip";
+ File zipFile = new File(zipFileFullPath);
+
+ if ( ! saveZipFile(conn, zipFile) ) {
+ updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch);
+ deleteDirectory(curBatchDir);
+ failedBatches ++;
+ continue; // To the next batch.
+ }
+
+ //logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
+
+ FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), targetPath);
+
+ String[] fileNames = curBatchDir.list();
+ if ( (fileNames == null) || (fileNames.length == 0) ) {
+ logger.error("No filenames where extracted from directory: " + targetLocation);
+ updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch);
+ deleteDirectory(curBatchDir);
+ failedBatches ++;
+ continue; // To the next batch.
+ }
+
+ // Iterate over the files and upload them to S3.
+ int numUploadedFiles = 0;
+ for ( String fileName : fileNames )
+ {
+ String fileFullPath = targetLocation + fileName;
+ if ( fileFullPath.equals(zipFileFullPath) ) { // Exclude the zip-file from uploading.
+ continue;
+ }
+ // Get the ID of the file.
+ Matcher matcher = FILENAME_ID.matcher(fileName);
+ if ( !matcher.matches() ) {
+ continue;
+ }
+ String id = matcher.group(1);
+ if ( (id == null) || id.isEmpty() ) {
+ continue;
+ }
+ Payload payload = payloadsHashMap.get(id);
+ if ( payload == null ) {
+ continue;
+ }
+ String location = payload.getLocation();
+ if ( location == null ) {
+ continue;
+ }
+ if ( ! location.endsWith(fileName) ) { // That should NEVER happen...
+ logger.error("The location \"" + location + "\" of the payload matched with the ID \"" + id + "\" is not ending with the filename it was supposed to \"" + fileName + "\"");
+ continue;
+ }
+
+ String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath);
+ if ( s3Url != null ) {
+ payload.setLocation(s3Url); // Update the file-location to the new S3-url.
+ numUploadedFiles ++;
+ } else
+ setUnretrievedFullText(payload);
+ }
+
+ logger.info("Finished uploading " + numUploadedFiles + " full-texts of assignments_" + assignmentsBatchCounter + " on S3-ObjectStore.");
+
+ } catch (Exception e) {
+ logger.error("Could not extract and upload the full-texts for batch_" + i + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
+ updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch);
+ failedBatches ++;
+ } finally {
+ deleteDirectory(curBatchDir); // Delete the files of this batch (including the zip-file).
+ }
+ } // End of batches.
+
+ // Delete this assignments-num directory.
+ deleteDirectory(curAssignmentsBaseDir);
+
+ // Check if none of the batches were handled..
+ if ( failedBatches == numOfBatches ) {
+ logger.error("None of the " + numOfBatches + " batches could be handled!");
+ return false;
+ } else {
+ replaceNotUploadedFileLocations(urlReports);
+ return true;
+ }
+ }
+
+
+ private static HttpURLConnection getConnection(String baseUrl, AtomicLong assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId)
+ {
+ String requestUrl = getRequestUrlForBatch(baseUrl, batchNum, fileNamesForCurBatch);
+ logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
+ HttpURLConnection conn = null;
+ try {
+ conn = (HttpURLConnection) new URL(requestUrl).openConnection();
+ conn.setRequestMethod("GET");
+ conn.setRequestProperty("User-Agent", "UrlsController");
+ conn.connect();
+ int statusCode = conn.getResponseCode();
+ if ( statusCode != 200 ) {
+ logger.warn("HTTP-" + statusCode + ": Problem with when requesting the ZipFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
+ return null;
+ }
+ } catch (Exception e) {
+ logger.warn("Problem when requesting the ZipFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + e.getMessage());
+ return null;
+ }
+ return conn;
+ }
+
+
+ private static List getFileNamesForBatch(List allFileNames, int numAllFullTexts, int curBatch, int numOfBatches)
+ {
+ int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
+ int endingIndex = (curBatch * numOfFullTextsPerBatch);
+ if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
+ endingIndex = numAllFullTexts;
+
+ List fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
+ for ( int i = initialIndex; i < endingIndex; ++i ) {
+ try {
+ fileNamesOfCurBatch.add(allFileNames.get(i));
+ } catch (IndexOutOfBoundsException ioobe) {
+ logger.error("IOOBE for i=" + i + "\n" + ioobe.getMessage(), ioobe);
+ }
+ }
+ return fileNamesOfCurBatch;
+ }
+
+
+ private static final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 100);
+ // TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests.
+
+ private static String getRequestUrlForBatch(String baseUrl, int curBatch, List fileNamesForCurBatch)
+ {
+ sb.append(baseUrl).append(curBatch).append("/");
+ int numFullTextsCurBatch = fileNamesForCurBatch.size();
+ for ( int j=0; j < numFullTextsCurBatch; ++j ){
+ sb.append(fileNamesForCurBatch.get(j));
+ if ( j < (numFullTextsCurBatch -1) )
+ sb.append(",");
+ }
+ String requestUrl = sb.toString();
+ sb.setLength(0); // Reset for the next batch.
+ return requestUrl;
+ }
+
+
+ /**
+ * This method updates the UrlReports to not point to any downloaded fullText files.
+ * This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails.
+ * Then, we don't want any "links" to locally stored files, which will be deleted.
+ * @param urlReports
+ * @return
+ */
+ public static void updateUrlReportsToHaveNoFullTextFiles(List urlReports)
+ {
+ for ( UrlReport urlReport : urlReports ) {
+ Payload payload = urlReport.getPayload();
+ if ( payload != null )
+ setUnretrievedFullText(payload);
+ }
+ }
+
+
+ private static void replaceNotUploadedFileLocations(List urlReports)
+ {
+ for ( UrlReport urlReport : urlReports ) {
+ Payload payload = urlReport.getPayload();
+ if ( payload != null ) {
+ String fileLocation = payload.getLocation();
+ if ( (fileLocation != null) && (! fileLocation.startsWith(S3ObjectStoreMinIO.endpoint)) )
+ setUnretrievedFullText(payload);
+ }
+ }
+ }
+
+
+ public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMap payloadsHashMap, List fileNames)
+ {
+ for ( String fileName : fileNames ) {
+ // Get the ID of the file.
+ Matcher matcher = FILENAME_ID.matcher(fileName);
+ if ( !matcher.matches() ) {
+ continue;
+ }
+ String id = matcher.group(1);
+ if ( (id == null) || id.isEmpty() ) {
+ continue;
+ }
+ Payload payload = payloadsHashMap.get(id);
+ if ( payload != null )
+ setUnretrievedFullText(payload); // It changes the payload in the original UrlReport list.
+ }
+ }
+
+
+ public static void setUnretrievedFullText(Payload payload)
+ {
+ // Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
+ payload.setLocation(null);
+ payload.setHash(null);
+ payload.setMime_type(null);
+ payload.setSize(null);
+ }
+
+
+ private static final int bufferSize = 20971520; // 20 MB
+ public static boolean saveZipFile(HttpURLConnection conn, File zipFile)
+ {
+ FileOutputStream outStream = null;
+ InputStream inStream = null;
+ try {
+ inStream = conn.getInputStream();
+ outStream = new FileOutputStream(zipFile);
+
+ byte[] byteBuffer = new byte[bufferSize]; // 20 MB
+ int bytesRead = -1;
+ while ( (bytesRead = inStream.read(byteBuffer, 0, bufferSize)) != -1 ) {
+ outStream.write(byteBuffer, 0, bytesRead);
+ }
+ return true;
+ } catch (Exception e) {
+ logger.error("Could not save the zip file \"" + zipFile.getName() + "\": " + e.getMessage(), e);
+ return false;
+ } finally {
+ try {
+ if ( inStream != null )
+ inStream.close();
+ if ( outStream != null )
+ outStream.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+
+ public static boolean deleteDirectory(File curBatchDir) {
+ try {
+ org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
+ return true;
+ } catch (IOException e) {
+ logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e);
+ return false;
+ }
+ }
+
+
+ private static String getCutBatchExceptionMessage(String sqleMessage)
+ {
+ // The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
+ int maxEnding = 1500;
+ if ( sqleMessage.length() > maxEnding )
+ return (sqleMessage.substring(0, maxEnding) + "...");
+ else
+ return sqleMessage;
+ }
+
+
// This is currently not used, but it may be useful in a future scenario.
private static long getInputFileLinesNum()
{
@@ -85,7 +490,6 @@ public class FileUtils {
logger.warn("The url was not found for id: \"" + idStr + "\"");
return null;
}
-
return new Task(idStr, urlStr, null);
}
@@ -166,6 +570,4 @@ public class FileUtils {
return false;
}
-
-
}
diff --git a/src/main/java/eu/openaire/urls_controller/util/S3ObjectStoreMinIO.java b/src/main/java/eu/openaire/urls_controller/util/S3ObjectStoreMinIO.java
new file mode 100644
index 0000000..0b7ce66
--- /dev/null
+++ b/src/main/java/eu/openaire/urls_controller/util/S3ObjectStoreMinIO.java
@@ -0,0 +1,226 @@
+package eu.openaire.urls_controller.util;
+
+import io.minio.*;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Scanner;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+public class S3ObjectStoreMinIO {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3ObjectStoreMinIO.class);
+
+ public static String endpoint = null; // This is useful to be "public", to test file-locations.
+ private static String accessKey = null;
+ private static String secretKey = null;
+ private static String region = null;
+ private static String bucketName = null;
+
+ private static MinioClient minioClient;
+
+ public static final boolean shouldEmptyBucket = false; // Set true only for testing!
+ public static final String credentialsFilePath = System.getProperty("user.dir") + File.separator + "S3_minIO_credentials.txt";
+ private static final boolean shouldShowAllS3Buckets = false;
+
+
+ /**
+ * This must be called before any other methods.
+ * */
+ public S3ObjectStoreMinIO()
+ {
+ // Take the credentials from the file.
+ Scanner myReader = null;
+ try {
+ File credentialsFile = new File(credentialsFilePath);
+ if ( !credentialsFile.exists() ) {
+ throw new RuntimeException("credentialsFile \"" + credentialsFilePath + "\" does not exists!");
+ }
+ myReader = new Scanner(credentialsFile);
+ if ( myReader.hasNextLine() ) {
+ String[] credentials = myReader.nextLine().split(",");
+ if ( credentials.length < 5 ) {
+ throw new RuntimeException("Not all credentials were retrieved from file \"" + credentialsFilePath + "\"!");
+ }
+ endpoint = credentials[0].trim();
+ accessKey = credentials[1].trim();
+ secretKey = credentials[2].trim();
+ region = credentials[3].trim();
+ bucketName = credentials[4].trim();
+ }
+ } catch (Exception e) {
+ String errorMsg = "An error prevented the retrieval of the minIO credentials from the file: " + credentialsFilePath + "\n" + e.getMessage();
+ logger.error(errorMsg, e);
+ System.err.println(errorMsg);
+ System.exit(53);
+ } finally {
+ if ( myReader != null )
+ myReader.close();
+ }
+
+ if ( (endpoint == null) || (accessKey == null) || (secretKey == null) || (region == null) || (bucketName == null) ) {
+ String errorMsg = "No \"endpoint\" or/and \"accessKey\" or/and \"secretKey\" or/and \"region\" or/and \"bucketName\" could be retrieved from the file: " + credentialsFilePath;
+ logger.error(errorMsg);
+ System.err.println(errorMsg);
+ System.exit(54);
+ }
+ // It's not safe, nor helpful to show the credentials in the logs.
+
+ minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).region(region).build();
+
+ boolean bucketExists = false;
+ try {
+ bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
+ } catch (Exception e) {
+ String errorMsg = "There was a problem while checking if the bucket \"" + bucketName + "\" exists!\n" + e.getMessage();
+ logger.error(errorMsg);
+ System.err.println(errorMsg);
+ System.exit(55);
+ }
+
+ // Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing).
+/* if ( bucketExists && shouldEmptyBucket ) {
+ emptyBucket(bucketName, false);
+ //throw new RuntimeException("stop just for test!");
+ }*/
+
+ // Make the bucket, if not exist.
+ try {
+ if ( !bucketExists ) {
+ logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it..");
+ minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
+ }
+ else
+ logger.warn("Bucket \"" + bucketName + "\" already exists.");
+ } catch (Exception e) {
+ String errorMsg = "Could not create the bucket \"" + bucketName + "\"!";
+ logger.error(errorMsg ,e);
+ System.err.println(errorMsg);
+ System.exit(56);
+ }
+
+ if ( shouldShowAllS3Buckets ) {
+ List buckets = null;
+ try {
+ buckets = minioClient.listBuckets();
+ logger.debug("The buckets in the S3 ObjectStore are:");
+ for ( Bucket bucket : buckets ) {
+ logger.debug(bucket.name());
+ }
+ } catch (Exception e) {
+ logger.warn("Could not listBuckets: " + e.getMessage());
+ }
+ }
+ }
+
+
+ public static final Pattern EXTENSION_PATTERN = Pattern.compile("(\\.[^.]+)$");
+
+ /**
+ * @param fileObjKeyName = "**File object key name**";
+ * @param fileFullPath = "**Path of the file to upload**";
+ * @return
+ */
+ public static String uploadToS3(String fileObjKeyName, String fileFullPath)
+ {
+ String contentType = null;
+
+ // Take the Matcher to retrieve the extension.
+ Matcher extensionMatcher = EXTENSION_PATTERN.matcher(fileFullPath);
+ if ( extensionMatcher.find() ) {
+ String extension = null;
+ if ( (extension = extensionMatcher.group(0)) == null )
+ contentType = "application/pdf";
+ else {
+ if ( extension.equals("pdf") )
+ contentType = "application/pdf";
+ /*else if ( *//* TODO - other-extension-match *//* )
+ contentType = "application/pdf"; */
+ else
+ contentType = "application/pdf";
+ }
+ } else {
+ logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType.");
+ contentType = "application/pdf";
+ }
+
+ ObjectWriteResponse response;
+ try {
+ response = minioClient.uploadObject(UploadObjectArgs.builder()
+ .bucket(bucketName)
+ .object(fileObjKeyName).filename(fileFullPath)
+ .contentType(contentType).build());
+
+ // TODO - What if the fileObjKeyName already exists?
+ // Right now it gets overwritten (unless we add versioning, which is irrelevant for different objects..)
+
+ } catch (Exception e) {
+ logger.error("Could not upload the file \"" + fileObjKeyName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
+ return null;
+ }
+
+ String s3Url = endpoint + "/" + bucketName + "/" + fileObjKeyName; // Be aware: This url works only if the access to the bucket is public.
+ //logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url);
+ return s3Url;
+ }
+
+
+ public static boolean emptyBucket(String bucketName, boolean shouldDeleteBucket)
+ {
+ logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"");
+
+ // First list the objects of the bucket.
+ Iterable> results;
+ try {
+ results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
+ } catch (Exception e) {
+ logger.error("Could not retrieve the list of objects of bucket \"" + bucketName + "\"!");
+ return false;
+ }
+
+ // Then, delete the objects.
+ for ( Result- resultItem : results ) {
+ try {
+ if ( !deleteFile(resultItem.get().objectName(), bucketName) ) {
+ logger.error("Cannot proceed with bucket deletion, since only an empty bucket can be removed!");
+ return false;
+ }
+ } catch (Exception e) {
+ logger.error("Error getting the object from resultItem: " + resultItem.toString() + "\nThe bucket \"" + bucketName + "\" will not be able to be deleted! Exception message: " + e.getMessage());
+ return false;
+ }
+ }
+
+ if ( shouldDeleteBucket ) {
+ // Lastly, delete the empty bucket.
+ try {
+ minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+ } catch (Exception e) {
+ logger.error("Could not delete the bucket \"" + bucketName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+ public static boolean deleteFile(String fileObjKeyName, String bucketName)
+ {
+ try {
+ minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(fileObjKeyName).build());
+ } catch (Exception e) {
+ logger.error("Could not delete the file \"" + fileObjKeyName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e);
+ return false;
+ }
+ return true;
+ }
+
+
+}