diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 8f65f93..2fdf215 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -1,8 +1,10 @@ package eu.openaire.urls_controller; import eu.openaire.urls_controller.controllers.UrlController; +import eu.openaire.urls_controller.util.UriBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -61,10 +63,9 @@ public class Application { } -// -// @Bean -// public CommandLineRunner setServerBaseUrl(Environment environment) { -// return args -> new UriBuilder(environment); -// } + @Bean + public CommandLineRunner setServerBaseUrl(Environment environment) { + return args -> new UriBuilder(environment); + } } \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index fdcd36b..36e2ed1 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -1,5 +1,6 @@ package eu.openaire.urls_controller.controllers; +import com.google.common.collect.Lists; import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.Error; import eu.openaire.urls_controller.models.*; @@ -48,7 +49,7 @@ public class UrlController { @Value("${services.pdfaggregation.controller.db.databaseName}") private String databaseName; - private AtomicInteger maxAttemptsPerRecordAtomic; + private final AtomicInteger maxAttemptsPerRecordAtomic; public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) { @@ -82,20 +83,24 @@ public class UrlController { } String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + - "from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" + - "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" + - "from " + databaseName + ".publication p\n" + - "join " + databaseName + ".publication_urls pu on pu.id=p.id\n" + - "join " + databaseName + ".datasource d on d.id=p.datasourceid\n" + - "left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + - "left outer join (\n" + - " select a.id, a.original_url from " + databaseName + ".assignment a\n" + - " union all\n" + - " select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + - "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + - "limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" + - "order by coalesce(attempt_count, 0), reverse(pubid), url\n" + - "limit " + assignmentsLimit + ") as findAssignmentsQuery"; + "from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count\n" + + "from (\n" + + "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" + + "from " + databaseName + ".publication p\n" + + "join " + databaseName + ".publication_urls pu on pu.id=p.id\n" + + "join " + databaseName + ".datasource d on d.id=p.datasourceid\n" + + "left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + + "left outer join (select a.id, a.original_url from " + databaseName + ".assignment a\n" + + "union all\n" + + "select pl.id, pl.original_url from " + databaseName + ".payload pl)\n" + + "as existing on existing.id=p.id and existing.original_url=pu.url\n" + + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + + "\nand not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + + "limit " + (assignmentsLimit * 10) + ")\n" + + "as non_distinct_results\n" + + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" + + "limit " + assignmentsLimit + ")\n" + + "as findAssignmentsQuery"; // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. //logger.debug(findAssignmentsQuery); // DEBUG! @@ -213,7 +218,7 @@ public class UrlController { } - public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(2); + public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(6); @PostMapping("addWorkerReport") public ResponseEntity addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) { @@ -259,6 +264,8 @@ public class UrlController { fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); } + logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments); + // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; @@ -268,10 +275,15 @@ public class UrlController { final AtomicInteger failedCount = new AtomicInteger(0); - List> callableTasks = new ArrayList<>(2); + // Split the "UrlReports" into some sub-lists + int sizeOfEachSubList = (int)(urlReports.size() * 0.2); + List> subLists = Lists.partition(urlReports, sizeOfEachSubList); + // The above will create some sub-lists, each one containing 20% of total amount. + + List> callableTasks = new ArrayList<>(6); // One thread will handle the inserts to the "payload" table adn the other to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala). - callableTasks.add(() -> { // Handle inserts to the "payload" table. + callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount. for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload == null ) { @@ -286,8 +298,7 @@ public class UrlController { try { Long size = payload.getSize(); Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(), - payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), - fileLocation, payload.getProvenance()}; + payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), fileLocation, payload.getProvenance()}; jdbcTemplate.update(insertIntoPayloadBaseQuery, args, payloadArgTypes); } catch (Exception e) { @@ -298,32 +309,13 @@ public class UrlController { return null; }); - callableTasks.add(() -> { // Handle inserts to the "attempt" table. - for ( UrlReport urlReport : urlReports ) { - Payload payload = urlReport.getPayload(); - if ( payload == null ) { - logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport); - continue; - } - - Error error = urlReport.getError(); - if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop). - logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members."); - error = new Error(null, null); - } - - try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons. - Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(), - urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()}; - - jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes); - } catch (Exception e) { - logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + e.getMessage()); - failedCount.incrementAndGet(); - } - } - return null; - }); + for ( int i = 0; i < subLists.size(); ++i ) { + int finalI = i; + callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. + runInsertsToAttemptTable(subLists.get(finalI), curReportAssignments, insertIntoAttemptBaseQuery, attemptArgTypes, failedCount); + return null; + }); + } ImpalaConnector.databaseLock.lock(); @@ -383,6 +375,34 @@ public class UrlController { } + private void runInsertsToAttemptTable(List urlReports, long curReportAssignments, String insertIntoAttemptBaseQuery, int[] attemptArgTypes, AtomicInteger failedCount ) + { + for ( UrlReport urlReport : urlReports ) { + Payload payload = urlReport.getPayload(); + if ( payload == null ) { + logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport); + continue; + } + + Error error = urlReport.getError(); + if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop). + logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members."); + error = new Error(null, null); + } + + try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons. + Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(), + urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()}; + + jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes); + } catch (Exception e) { + logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + e.getMessage()); + failedCount.incrementAndGet(); + } + } + } + + // The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution. // Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested. private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException { diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index d80dd43..0d197a4 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -251,7 +251,7 @@ public class FileUtils { // Else, the record will have its file-data set to "null", in the end of this method. } - logger.info("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); + //logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); // (fileNames.length -1) --> minus the zip-file } catch (Exception e) { @@ -274,7 +274,7 @@ public class FileUtils { private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) { baseUrl += batchNum + "/"; String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); - logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); + //logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); HttpURLConnection conn = null; try { conn = (HttpURLConnection) new URL(requestUrl).openConnection(); diff --git a/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java b/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java index c1da4a4..329dc11 100644 --- a/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java +++ b/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java @@ -11,74 +11,75 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; + public class UriBuilder { -// private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class); -// -// public static String baseUrl = null; -// -// public UriBuilder(Environment environment) { -// baseUrl = "http"; -// String sslEnabled = environment.getProperty("server.ssl.enabled"); -// if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration. -// logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP.."); -// sslEnabled = "false"; -// } -// baseUrl += sslEnabled.equals("true") ? "s" : ""; -// baseUrl += "://"; -// -// String hostName = getPublicIP(); -// if ( hostName == null ) -// hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null. -// -// baseUrl += hostName; -// String serverPort = environment.getProperty("server.port"); -// if (serverPort == null) { // This is unacceptable! -// logger.error("No property \"server.port\" was found in \"application.properties\"!"); -// System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway. -// } -// baseUrl += ":" + serverPort; -// -// String baseInternalPath = environment.getProperty("server.servlet.context-path"); -// if ( baseInternalPath != null ) { -// if ( !baseInternalPath.startsWith("/") ) -// baseUrl += "/"; -// baseUrl += baseInternalPath; -// if ( !baseInternalPath.endsWith("/") ) -// baseUrl += "/"; -// } else { -// logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected. -// baseUrl += "/"; -// } -// -// logger.debug("ServerBaseURL: " + baseUrl); -// } -// -// private String getPublicIP() -// { -// String publicIpAddress = ""; -// URL url_name; -// try { -// url_name = new URL("https://api.ipify.org/"); -// } catch (MalformedURLException mue) { -// logger.warn(mue.getMessage()); -// return null; -// } -// try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) { -// publicIpAddress = bf.readLine().trim(); -// } catch (Exception e) { -// logger.warn("Cannot get the publicIP address for this machine!", e); -// return null; -// } -// return publicIpAddress; -// } -// -// public static String getBaseUrl() { -// return baseUrl; -// } -// -// public static void setBaseUrl(String baseUrl) { -// UriBuilder.baseUrl = baseUrl; -// } + private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class); + + public static String baseUrl = null; + + public UriBuilder(Environment environment) { + baseUrl = "http"; + String sslEnabled = environment.getProperty("server.ssl.enabled"); + if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration. + logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP.."); + sslEnabled = "false"; + } + baseUrl += sslEnabled.equals("true") ? "s" : ""; + baseUrl += "://"; + + String hostName = getPublicIP(); + if ( hostName == null ) + hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null. + + baseUrl += hostName; + String serverPort = environment.getProperty("server.port"); + if (serverPort == null) { // This is unacceptable! + logger.error("No property \"server.port\" was found in \"application.properties\"!"); + System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway. + } + baseUrl += ":" + serverPort; + + String baseInternalPath = environment.getProperty("server.servlet.context-path"); + if ( baseInternalPath != null ) { + if ( !baseInternalPath.startsWith("/") ) + baseUrl += "/"; + baseUrl += baseInternalPath; + if ( !baseInternalPath.endsWith("/") ) + baseUrl += "/"; + } else { + logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected. + baseUrl += "/"; + } + + logger.debug("ServerBaseURL: " + baseUrl); + } + + private String getPublicIP() + { + String publicIpAddress = ""; + URL url_name; + try { + url_name = new URL("https://api.ipify.org/"); + } catch (MalformedURLException mue) { + logger.warn(mue.getMessage()); + return null; + } + try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) { + publicIpAddress = bf.readLine().trim(); + } catch (Exception e) { + logger.warn("Cannot get the publicIP address for this machine!", e); + return null; + } + return publicIpAddress; + } + + public static String getBaseUrl() { + return baseUrl; + } + + public static void setBaseUrl(String baseUrl) { + UriBuilder.baseUrl = baseUrl; + } } \ No newline at end of file