diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 1803e0b..8f65f93 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -1,7 +1,8 @@ package eu.openaire.urls_controller; -import eu.openaire.urls_controller.util.UriBuilder; -import org.springframework.boot.CommandLineRunner; +import eu.openaire.urls_controller.controllers.UrlController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -11,13 +12,17 @@ import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.CorsConfigurationSource; import org.springframework.web.cors.UrlBasedCorsConfigurationSource; +import javax.annotation.PreDestroy; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.TimeUnit; @SpringBootApplication @EnableScheduling public class Application { + private static final Logger logger = LoggerFactory.getLogger(Application.class); + public static void main(String[] args) { SpringApplication.run(Application.class, args); } @@ -33,6 +38,29 @@ public class Application { source.registerCorsConfiguration("/**", configuration); return source; } + + + @PreDestroy + public void closeThreads() { + logger.info("Shutting down the threads.."); + UrlController.insertsExecutor.shutdown(); // Define that no new tasks will be scheduled. + try { + if ( ! UrlController.insertsExecutor.awaitTermination(1, TimeUnit.MINUTES) ) { + logger.warn("The working threads did not finish on time! Stopping them immediately.."); + UrlController.insertsExecutor.shutdownNow(); + } + } catch (SecurityException se) { + logger.error("Could not shutdown the threads in any way..!", se); + } catch (InterruptedException ie) { + try { + UrlController.insertsExecutor.shutdownNow(); + } catch (SecurityException se) { + logger.error("Could not shutdown the threads in any way..!", se); + } + } + } + + // // @Bean // public CommandLineRunner setServerBaseUrl(Environment environment) { diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index 19e4cc6..eb47805 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -19,6 +19,10 @@ import javax.servlet.http.HttpServletRequest; import java.sql.*; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -212,6 +216,8 @@ public class UrlController { } + public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(2); + @PostMapping("addWorkerReport") public ResponseEntity addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) { @@ -258,58 +264,84 @@ public class UrlController { // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; + String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; - String payloadErrorMsg = null; - int failedCount = 0; + int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; + + final AtomicInteger failedCount = new AtomicInteger(0); + + List> callableTasks = new ArrayList<>(2); + // One thread will handle the inserts to the "payload" table adn the other to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala). + + callableTasks.add(() -> { // Handle inserts to the "payload" table. + for ( UrlReport urlReport : urlReports ) { + Payload payload = urlReport.getPayload(); + if ( payload == null ) { + logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport); + continue; + } + + try { + Long size = payload.getSize(); + Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(), + payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), + payload.getLocation(), payload.getProvenance()}; + + jdbcTemplate.update(insertIntoPayloadBaseQuery, args, payloadArgTypes); + } catch (Exception sqle) { + logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": ", sqle); + failedCount.incrementAndGet(); + } + } + return null; + }); + + callableTasks.add(() -> { // Handle inserts to the "attempt" table. + for ( UrlReport urlReport : urlReports ) { + Payload payload = urlReport.getPayload(); + if ( payload == null ) { + logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport); + continue; + } + + Error error = urlReport.getError(); + if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop). + logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members."); + error = new Error(null, null); + } + + try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons. + Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(), + urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()}; + + jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes); + } catch (Exception sqle) { + logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage()); + failedCount.incrementAndGet(); + } + } + return null; + }); ImpalaConnector.databaseLock.lock(); - // TODO - Think about handling this loop with multiple threads.. The Impala-server will handle the synchronization itself.. - for ( UrlReport urlReport : urlReports ) { - Payload payload = urlReport.getPayload(); - if ( payload == null ) { - logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments); - payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!"; - continue; - } + try { // Invoke all the tasks and wait for them to finish before moving to the next batch. + insertsExecutor.invokeAll(callableTasks); + } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. + logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); + // TODO - This is a very rare case, but what should be done..? + } catch (Exception e) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage(); + logger.error(errorMsg, e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } - try { - Long size = payload.getSize(); - Object[] args = new Object[] { - payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(), - payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), - payload.getLocation(), payload.getProvenance()}; - int[] argTypes = new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; - - jdbcTemplate.update(insertIntoPayloadBaseQuery, args, argTypes); - } catch (Exception sqle) { - logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": ", sqle); - } - - Error error = urlReport.getError(); - if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop). - logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members."); - error = new Error(null, null); - } - - try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons. - Object[] args = new Object[] { - payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(), - urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()}; - int[] argTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; - - jdbcTemplate.update(insertIntoAttemptBaseQuery, args, argTypes); - } catch (Exception sqle) { - logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage()); - } - }//end for-loop - - if ( payloadErrorMsg != null ) - logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables."); - else - logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables."); + int failedQueries = failedCount.get(); + String failedQueriesMsg = failedQueries + " out of " + urlReports.size() + " failed to be processed!"; + logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables" + ((failedQueries > 0) ? (", although " + failedQueriesMsg) : ".") + + " Going to merge the parquet files for those tables."); String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null); if ( mergeErrorMsg != null ) { @@ -333,8 +365,9 @@ public class UrlController { } ImpalaConnector.databaseLock.unlock(); + logger.debug("Finished merging the database tables."); - return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg); + return ResponseEntity.status(HttpStatus.OK).body(failedQueriesMsg); } diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 3936dea..476bf88 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -362,9 +362,7 @@ public class FileUtils { if ( j < (numFullTextsCurBatch -1) ) sb.append(","); } - String requestUrl = sb.toString(); - - return requestUrl; + return sb.toString(); } private final int bufferSize = 20971520; // 20 MB