Improve aggregation speed, by generating additional "attempt" and "payload" records for the publications which are in the back-log and their url matches to one of the urls of the current payloads.

This commit is contained in:
Lampros Smyrnaios 2023-10-04 15:43:31 +03:00
parent b702cf4484
commit 7019f7c3c7
3 changed files with 199 additions and 49 deletions

View File

@ -318,11 +318,16 @@ public class UrlsServiceImpl implements UrlsService {
return false;
}
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse);
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
DatabaseConnector.databaseLock.lock();
// Lock the DB here so the prefilled-Payloads which will be generated inside the "getTasksForCreatingAndUploadingParquetFiles()" method (using a dedicated query)
// will be synchronized with the insert of all attempt and payload records to the DB.
// This is important in order to avoid having workers take these records as assignments, when we know that payloads are ready to be inserted for them.
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse);
try { // Invoke all the tasks and wait for them to finish.
List<Future<ParquetReport>> futures = insertsExecutor.invokeAll(callableTasks);
@ -330,14 +335,16 @@ public class UrlsServiceImpl implements UrlsService {
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) { // The related log is already shown.
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
DatabaseConnector.databaseLock.unlock();
return false;
}
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) {
DatabaseConnector.databaseLock.unlock();
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignmentsCounter);
else {
} else {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter);
else if ( hasPayloadParquetFileProblem )
@ -347,7 +354,6 @@ public class UrlsServiceImpl implements UrlsService {
}
// Load all the parquet files of each type into its table.
DatabaseConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt");
@ -364,15 +370,18 @@ public class UrlsServiceImpl implements UrlsService {
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
} catch (InterruptedException ie) { // Thrown by "insertsExecutor.invokeAll()". In this case, any unfinished tasks are cancelled.
DatabaseConnector.databaseLock.unlock();
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging.
} catch (RuntimeException re) {
// Only one of the REs is inside the DB-locked code, so the "unlock" happens before it's thrown, there.
String errorMsg = re.getMessage();
logger.error(errorMsg);
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
return false;
} catch (Exception e) {
} catch (Exception e) { // Thrown by "insertsExecutor.invokeAll()".
DatabaseConnector.databaseLock.unlock();
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);

View File

@ -5,6 +5,7 @@ import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
@ -24,10 +25,12 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.sql.Types;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@ -38,6 +41,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component
public class FileUtils {
@ -696,6 +701,114 @@ public class FileUtils {
}
/**
* This method searches the backlog of publications for the ones that have the same "original_url" or their "original_url" is equal to the "actual_url" or an existing payload.
* Then, it generates a new "UrlReport" object, which has as a payload a previously generated one, which has different "id", "original_url".
* Then, the program automatically generates "attempt" and "payload" records for these additional UrlReport-records.
* It must be executed inside the same "database-locked" block of code, along with the inserts of the attempt and payload records.
* */
public boolean addUrlReportsByMatchingRecordsFromBacklog(List<UrlReport> urlReports, List<Payload> initialPayloads, long assignmentsBatchCounter)
{
int numInitialPayloads = initialPayloads.size();
logger.debug("numInitialPayloads: " + numInitialPayloads + " | assignmentsBatchCounter: " + assignmentsBatchCounter);
// Create a HashMultimap, containing the "original_url" or "actual_url" as the key and the related "payload" objects as its values.
final SetMultimap<String, Payload> urlToPayloadsMultimap = Multimaps.synchronizedSetMultimap(HashMultimap.create((numInitialPayloads / 3), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( Payload payload : initialPayloads ) {
String original_url = payload.getOriginal_url();
String actual_url = payload.getActual_url();
// Link this payload with both the original and actual urls (in case they are different).
urlToPayloadsMultimap.put(original_url, payload);
if ( ! actual_url.equals(original_url) )
urlToPayloadsMultimap.put(actual_url, payload);
}
// A url may be related to different payloads, in the urlReports. For example, in one payload the url was the original_url
// but in another payload the url was only the actual-url (the original was different)
// Gather the original and actual urls of the current payloads and add them in a list usable by the query.
List<String> urlsToRetrieveRelatedIDs = initialPayloads.parallelStream()
.flatMap(payload -> Stream.of(payload.getOriginal_url(), payload.getActual_url())) // Add both "original_url" and "actual_url" in the final results.
.collect(Collectors.toList());
// Prepare the "urlsToRetrieveRelatedIDs" to be used inside the "getDataForPayloadPrefillQuery". Create the following string-pattern: ("URL_1", "URL_2", ...)
int urlsToRetrieveRelatedIDsSize = urlsToRetrieveRelatedIDs.size();
StringBuilder relatedIDsStringBuilder = new StringBuilder(urlsToRetrieveRelatedIDsSize * 100);
relatedIDsStringBuilder.append("(");
for ( int i=0; i < urlsToRetrieveRelatedIDsSize; ++i ) {
relatedIDsStringBuilder.append("\"").append(urlsToRetrieveRelatedIDs.get(i)).append("\"");
if ( i < (urlsToRetrieveRelatedIDsSize -1) )
relatedIDsStringBuilder.append(", ");
}
relatedIDsStringBuilder.append(")");
// Get the id and url of any
String getDataForPayloadPrefillQuery = "select pu.id, pu.url\n" +
"from " + DatabaseConnector.databaseName + ".publication_urls pu\n" +
"left anti join " + DatabaseConnector.databaseName + ".attempt a on a.id=pu.id and a.original_url=pu.url\n" +
"left anti join " + DatabaseConnector.databaseName + ".payload p on p.id=pu.id and p.original_url=pu.url\n" +
"where pu.url in " + relatedIDsStringBuilder;
//logger.trace("getDataForPayloadPrefillQuery:\n" + getDataForPayloadPrefillQuery);
final List<Payload> prefilledPayloads = new ArrayList<>(1000);
try {
jdbcTemplate.query(getDataForPayloadPrefillQuery, rs -> {
String id;
String original_url;
try { // For each of the 2 columns returned, do the following. The column-indexing starts from 1.
id = rs.getString(1);
original_url = rs.getString(2);
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
return; // Move to the next payload.
}
Set<Payload> foundPayloads = urlToPayloadsMultimap.get(original_url);
// Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random)
Optional<Payload> optPayload = foundPayloads.stream().findFirst();
if ( !optPayload.isPresent() ) {
logger.error("Could not retrieve any payload for the \"original_url\": " + original_url);
return; // Move to the next payload.
}
Payload prefilledPayload = optPayload.get(); // We change just the id and the original_url.
prefilledPayload.setId(id);
prefilledPayload.setOriginal_url(original_url);
prefilledPayloads.add(prefilledPayload);
});
} catch (EmptyResultDataAccessException erdae) {
String errorMsg = "No results retrieved from the \"getDataForPayloadPrefillQuery\", when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".";
logger.error(errorMsg);
return false;
} catch (Exception e) {
DatabaseConnector.handleQueryException("getDataForPayloadPrefillQuery", getDataForPayloadPrefillQuery, e);
return false;
}
int numPrefilledPayloads = prefilledPayloads.size();
if ( numPrefilledPayloads == 0 ) {
String errorMsg = "Some results were retrieved from the \"getDataForPayloadPrefillQuery\", but no data could be extracted from them, when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".";
logger.error(errorMsg);
return false;
}
logger.debug("numPrefilledPayloads: " + numPrefilledPayloads + " | assignmentsBatchCounter: " + assignmentsBatchCounter);
// Add the prefilled to the UrlReports.
final Error noError = new Error(null, null);
for ( Payload prefilledPayload : prefilledPayloads )
{
urlReports.add(new UrlReport(UrlReport.StatusType.accessible, prefilledPayload, noError));
}
logger.debug("Final number of UrlReports is " + urlReports.size() + " | assignmentsBatchCounter: " + assignmentsBatchCounter);
// In order to avoid assigning these "prefill" records to workers, before they are inserted in the attempt and payload tables..
// We have to make sure this method is called inside a "DB-locked" code block and the "DB-unlock" happens only after all records are loaded into the DB-tables.
return true;
}
/**
* Set the fileLocation for all those Payloads related to the File.
* @param filePayloads

View File

@ -44,6 +44,7 @@ import java.util.Base64;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@Component
public class ParquetFileUtils {
@ -66,10 +67,10 @@ public class ParquetFileUtils {
private final String hdfsUserName;
public static final String payloadSchemaFilePath = "schemas/payload.avsc";
private static final String attemptSchemaFilePath = "schemas/attempt.avsc";
public static final String payloadSchemaFilePath = "schemas/payload.avsc";
public static Schema payloadsSchema = null;
public Schema attemptsSchema;
@ -153,51 +154,78 @@ public class ParquetFileUtils {
}
/**
* The tasks created below must be executed inside a "database-lock" block.
* */
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{
// Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists;
// Pre-define the tasks to run.
List<Callable<ParquetReport>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table and the others to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.2);
if ( sizeOfEachSubList > 10 )
{
subLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
// The above will create some sub-lists, each one containing 20% of total amount.
int subListsSize = subLists.size();
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts.
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath));
});
}
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
{
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
});
}
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the attemptsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
// Pre-define the tasks to run in multiple threads.
final List<Callable<ParquetReport>> callableTasks = new ArrayList<>(8); // 5 threads will handle the attempts and 3 the payloads.
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
{
List<Payload> currentPayloads = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.collect(Collectors.toList());
if ( currentPayloads.size() > 0 ) // If at least 1 payload was created by the processed records..
{ // (it's ok to have no payloads, if there were no full-texts available)
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
}
// The UrlsReports for the pre-filled are added only here, since we do not want attempt records fo these.
fileUtils.addUrlReportsByMatchingRecordsFromBacklog(urlReports, currentPayloads, curReportAssignments); // This will add more Object in the update the "urlReports" list.
// In case the above method returns an error, nothing happens. We just have only the initial payloads to insert to the DB.
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.33); // We want 3 sub-lists for the payloads.
// There may be 1 more with very few elements, due to non-persisted splitting. Unfortunately, we cannot st the number of splits, only the size of most splits.
if ( sizeOfEachSubList > 10 ) {
List<List<UrlReport>> finalSubLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
int numSubListsPayload = finalSubLists.size();
// We will run <numSubListsPayload> tasks for the payloads.
for ( int i = 0; i < numSubListsPayload; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(finalI, finalSubLists.get(finalI), curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the payload (sizeOfEachSubList * 5).
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(0, urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
});
}
}
}
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.2); // We want 5 sub-lists for the attempts.
// There may be 1 more with very few elements, due to non-persisted splitting. Unfortunately, we cannot st the number of splits, only the size of most splits.
if ( sizeOfEachSubList > 10 ) {
List<List<UrlReport>> finalSubLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
// The above will create some sub-lists, each one containing 20% of total amount.
int numSubListsAttempt = finalSubLists.size();
// We will run <numSubListsAttempt> tasks for the payloads.
for ( int i = 0; i < numSubListsAttempt; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, finalSubLists.get(finalI), curReportAssignments, currentParquetPath));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts (sizeOfEachSubList * 5).
callableTasks.add(() -> { // Handle inserts to the "attempt" table.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath));
});
}
return callableTasks;