- Improve performance when inserting records into the "attempt" table, by splitting the records equally, across more threads.

- Bring back the "UriBuilder", which informs us in the logs, about the Controller's url (IP, PORT, API).
- Code cleanup.
This commit is contained in:
Lampros Smyrnaios 2022-02-22 13:54:16 +02:00
parent dfd40cb105
commit ad5dbdde9b
4 changed files with 142 additions and 120 deletions

View File

@ -1,8 +1,10 @@
package eu.openaire.urls_controller; package eu.openaire.urls_controller;
import eu.openaire.urls_controller.controllers.UrlController; import eu.openaire.urls_controller.controllers.UrlController;
import eu.openaire.urls_controller.util.UriBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -61,10 +63,9 @@ public class Application {
} }
// @Bean
// @Bean public CommandLineRunner setServerBaseUrl(Environment environment) {
// public CommandLineRunner setServerBaseUrl(Environment environment) { return args -> new UriBuilder(environment);
// return args -> new UriBuilder(environment); }
// }
} }

View File

@ -1,5 +1,6 @@
package eu.openaire.urls_controller.controllers; package eu.openaire.urls_controller.controllers;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error; import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.models.*;
@ -48,7 +49,7 @@ public class UrlController {
@Value("${services.pdfaggregation.controller.db.databaseName}") @Value("${services.pdfaggregation.controller.db.databaseName}")
private String databaseName; private String databaseName;
private AtomicInteger maxAttemptsPerRecordAtomic; private final AtomicInteger maxAttemptsPerRecordAtomic;
public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) { public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) {
@ -82,20 +83,24 @@ public class UrlController {
} }
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" + "from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count\n" +
"from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" + "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + databaseName + ".publication p\n" + "from " + databaseName + ".publication p\n" +
"join " + databaseName + ".publication_urls pu on pu.id=p.id\n" + "join " + databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + databaseName + ".datasource d on d.id=p.datasourceid\n" + "join " + databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + "left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (\n" + "left outer join (select a.id, a.original_url from " + databaseName + ".assignment a\n" +
" select a.id, a.original_url from " + databaseName + ".assignment a\n" + "union all\n" +
" union all\n" + "select pl.id, pl.original_url from " + databaseName + ".payload pl)\n" +
" select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + "as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() +
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" + "\nand not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"limit " + (assignmentsLimit * 10) + ")\n" +
"as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ") as findAssignmentsQuery"; "limit " + assignmentsLimit + ")\n" +
"as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
//logger.debug(findAssignmentsQuery); // DEBUG! //logger.debug(findAssignmentsQuery); // DEBUG!
@ -213,7 +218,7 @@ public class UrlController {
} }
public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(2); public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
@PostMapping("addWorkerReport") @PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) { public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
@ -259,6 +264,8 @@ public class UrlController {
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
} }
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.
final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
@ -268,10 +275,15 @@ public class UrlController {
final AtomicInteger failedCount = new AtomicInteger(0); final AtomicInteger failedCount = new AtomicInteger(0);
List<Callable<Void>> callableTasks = new ArrayList<>(2); // Split the "UrlReports" into some sub-lists
int sizeOfEachSubList = (int)(urlReports.size() * 0.2);
List<List<UrlReport>> subLists = Lists.partition(urlReports, sizeOfEachSubList);
// The above will create some sub-lists, each one containing 20% of total amount.
List<Callable<Void>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table adn the other to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala). // One thread will handle the inserts to the "payload" table adn the other to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
callableTasks.add(() -> { // Handle inserts to the "payload" table. callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
for ( UrlReport urlReport : urlReports ) { for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload(); Payload payload = urlReport.getPayload();
if ( payload == null ) { if ( payload == null ) {
@ -286,8 +298,7 @@ public class UrlController {
try { try {
Long size = payload.getSize(); Long size = payload.getSize();
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(), Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(),
payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), fileLocation, payload.getProvenance()};
fileLocation, payload.getProvenance()};
jdbcTemplate.update(insertIntoPayloadBaseQuery, args, payloadArgTypes); jdbcTemplate.update(insertIntoPayloadBaseQuery, args, payloadArgTypes);
} catch (Exception e) { } catch (Exception e) {
@ -298,32 +309,13 @@ public class UrlController {
return null; return null;
}); });
callableTasks.add(() -> { // Handle inserts to the "attempt" table. for ( int i = 0; i < subLists.size(); ++i ) {
for ( UrlReport urlReport : urlReports ) { int finalI = i;
Payload payload = urlReport.getPayload(); callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
if ( payload == null ) { runInsertsToAttemptTable(subLists.get(finalI), curReportAssignments, insertIntoAttemptBaseQuery, attemptArgTypes, failedCount);
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes);
} catch (Exception e) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + e.getMessage());
failedCount.incrementAndGet();
}
}
return null; return null;
}); });
}
ImpalaConnector.databaseLock.lock(); ImpalaConnector.databaseLock.lock();
@ -383,6 +375,34 @@ public class UrlController {
} }
private void runInsertsToAttemptTable(List<UrlReport> urlReports, long curReportAssignments, String insertIntoAttemptBaseQuery, int[] attemptArgTypes, AtomicInteger failedCount )
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes);
} catch (Exception e) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + e.getMessage());
failedCount.incrementAndGet();
}
}
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution. // The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested. // Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException { private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {

View File

@ -251,7 +251,7 @@ public class FileUtils {
// Else, the record will have its file-data set to "null", in the end of this method. // Else, the record will have its file-data set to "null", in the end of this method.
} }
logger.info("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); //logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
// (fileNames.length -1) --> minus the zip-file // (fileNames.length -1) --> minus the zip-file
} catch (Exception e) { } catch (Exception e) {
@ -274,7 +274,7 @@ public class FileUtils {
private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) { private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) {
baseUrl += batchNum + "/"; baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); //logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
HttpURLConnection conn = null; HttpURLConnection conn = null;
try { try {
conn = (HttpURLConnection) new URL(requestUrl).openConnection(); conn = (HttpURLConnection) new URL(requestUrl).openConnection();

View File

@ -11,74 +11,75 @@ import java.net.InetAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
public class UriBuilder { public class UriBuilder {
// private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class); private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class);
//
// public static String baseUrl = null; public static String baseUrl = null;
//
// public UriBuilder(Environment environment) { public UriBuilder(Environment environment) {
// baseUrl = "http"; baseUrl = "http";
// String sslEnabled = environment.getProperty("server.ssl.enabled"); String sslEnabled = environment.getProperty("server.ssl.enabled");
// if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration. if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration.
// logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP.."); logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP..");
// sslEnabled = "false"; sslEnabled = "false";
// } }
// baseUrl += sslEnabled.equals("true") ? "s" : ""; baseUrl += sslEnabled.equals("true") ? "s" : "";
// baseUrl += "://"; baseUrl += "://";
//
// String hostName = getPublicIP(); String hostName = getPublicIP();
// if ( hostName == null ) if ( hostName == null )
// hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null. hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null.
//
// baseUrl += hostName; baseUrl += hostName;
// String serverPort = environment.getProperty("server.port"); String serverPort = environment.getProperty("server.port");
// if (serverPort == null) { // This is unacceptable! if (serverPort == null) { // This is unacceptable!
// logger.error("No property \"server.port\" was found in \"application.properties\"!"); logger.error("No property \"server.port\" was found in \"application.properties\"!");
// System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway. System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway.
// } }
// baseUrl += ":" + serverPort; baseUrl += ":" + serverPort;
//
// String baseInternalPath = environment.getProperty("server.servlet.context-path"); String baseInternalPath = environment.getProperty("server.servlet.context-path");
// if ( baseInternalPath != null ) { if ( baseInternalPath != null ) {
// if ( !baseInternalPath.startsWith("/") ) if ( !baseInternalPath.startsWith("/") )
// baseUrl += "/"; baseUrl += "/";
// baseUrl += baseInternalPath; baseUrl += baseInternalPath;
// if ( !baseInternalPath.endsWith("/") ) if ( !baseInternalPath.endsWith("/") )
// baseUrl += "/"; baseUrl += "/";
// } else { } else {
// logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected. logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected.
// baseUrl += "/"; baseUrl += "/";
// } }
//
// logger.debug("ServerBaseURL: " + baseUrl); logger.debug("ServerBaseURL: " + baseUrl);
// } }
//
// private String getPublicIP() private String getPublicIP()
// { {
// String publicIpAddress = ""; String publicIpAddress = "";
// URL url_name; URL url_name;
// try { try {
// url_name = new URL("https://api.ipify.org/"); url_name = new URL("https://api.ipify.org/");
// } catch (MalformedURLException mue) { } catch (MalformedURLException mue) {
// logger.warn(mue.getMessage()); logger.warn(mue.getMessage());
// return null; return null;
// } }
// try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) { try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) {
// publicIpAddress = bf.readLine().trim(); publicIpAddress = bf.readLine().trim();
// } catch (Exception e) { } catch (Exception e) {
// logger.warn("Cannot get the publicIP address for this machine!", e); logger.warn("Cannot get the publicIP address for this machine!", e);
// return null; return null;
// } }
// return publicIpAddress; return publicIpAddress;
// } }
//
// public static String getBaseUrl() { public static String getBaseUrl() {
// return baseUrl; return baseUrl;
// } }
//
// public static void setBaseUrl(String baseUrl) { public static void setBaseUrl(String baseUrl) {
// UriBuilder.baseUrl = baseUrl; UriBuilder.baseUrl = baseUrl;
// } }
} }