- In test-environment mode, check for already existing file-hashes only in the "payload_aggregated" table, instead of the whole "payload" view. This way the investigation for false-positive docUrls is easier, as we avoid checking against the millions of "legacy" payloads.
- Improve performance in production, by not creating the string objects for "trace"-logs.
This commit is contained in:
parent
8381df70c6
commit
9412391903
|
@ -58,7 +58,10 @@ public class StatsController {
|
||||||
final String getPayloadsNumberForDatasourceQuery =
|
final String getPayloadsNumberForDatasourceQuery =
|
||||||
"select count(p.id) from " + ImpalaConnector.databaseName + ".payload p\n" +
|
"select count(p.id) from " + ImpalaConnector.databaseName + ".payload p\n" +
|
||||||
" join " + ImpalaConnector.databaseName + ".publication pu on pu.id=p.id and pu.datasourceid=\"" + datasourceId + "\"";
|
" join " + ImpalaConnector.databaseName + ".publication pu on pu.id=p.id and pu.datasourceid=\"" + datasourceId + "\"";
|
||||||
logger.trace("getPayloadsNumberForDatasourceQuery:\n" + getPayloadsNumberForDatasourceQuery);
|
|
||||||
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("getPayloadsNumberForDatasourceQuery:\n" + getPayloadsNumberForDatasourceQuery);
|
||||||
|
|
||||||
return statsService.getNumberOfPayloads(getPayloadsNumberForDatasourceQuery, "payloads related to datasourceId \"" + datasourceId + "\"");
|
return statsService.getNumberOfPayloads(getPayloadsNumberForDatasourceQuery, "payloads related to datasourceId \"" + datasourceId + "\"");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,8 @@ public class FullTextsServiceImpl implements FullTextsService {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace("fileLocations:\n" + fileLocations);
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("fileLocations:\n" + fileLocations);
|
||||||
|
|
||||||
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
|
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
|
||||||
try {
|
try {
|
||||||
|
@ -268,7 +269,9 @@ public class FullTextsServiceImpl implements FullTextsService {
|
||||||
// Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table.
|
// Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table.
|
||||||
String parquetFileName = "payloads_" + segmentCounter + ".parquet";
|
String parquetFileName = "payloads_" + segmentCounter + ".parquet";
|
||||||
String fullLocalParquetFilePath = localParquetDir + parquetFileName;
|
String fullLocalParquetFilePath = localParquetDir + parquetFileName;
|
||||||
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG!
|
|
||||||
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG!
|
||||||
|
|
||||||
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) {
|
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) {
|
||||||
bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!");
|
bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!");
|
||||||
|
@ -276,9 +279,9 @@ public class FullTextsServiceImpl implements FullTextsService {
|
||||||
// None of the files of this segment will be deleted, in any case.
|
// None of the files of this segment will be deleted, in any case.
|
||||||
return numOfFilesInSegment;
|
return numOfFilesInSegment;
|
||||||
}
|
}
|
||||||
//logger.trace("Parquet file '" + parquetFileName + "' was created and filled."); // DEBUG!
|
|
||||||
|
|
||||||
logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG!
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG!
|
||||||
|
|
||||||
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
|
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
|
||||||
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir);
|
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir);
|
||||||
|
@ -289,7 +292,8 @@ public class FullTextsServiceImpl implements FullTextsService {
|
||||||
return numOfFilesInSegment;
|
return numOfFilesInSegment;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG!
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG!
|
||||||
|
|
||||||
ImpalaConnector.databaseLock.lock();
|
ImpalaConnector.databaseLock.lock();
|
||||||
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
|
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
|
||||||
|
@ -371,7 +375,6 @@ public class FullTextsServiceImpl implements FullTextsService {
|
||||||
|
|
||||||
// openaire id = <datasourcePrefix> + "::" + <md5(lowercase(arxivId))>
|
// openaire id = <datasourcePrefix> + "::" + <md5(lowercase(arxivId))>
|
||||||
String openAireId = (datasourcePrefix + "::" + idMd5hash);
|
String openAireId = (datasourcePrefix + "::" + idMd5hash);
|
||||||
//logger.trace("openAireId: " + openAireId);
|
|
||||||
|
|
||||||
String s3Url = null;
|
String s3Url = null;
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,8 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
if ( bulkImportSources.isEmpty() )
|
if ( bulkImportSources.isEmpty() )
|
||||||
return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method.
|
return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method.
|
||||||
|
|
||||||
logger.trace("BulkImportSources:\n" + bulkImportSources);
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("BulkImportSources:\n" + bulkImportSources);
|
||||||
|
|
||||||
List<String> excludedIDs = new ArrayList<>();
|
List<String> excludedIDs = new ArrayList<>();
|
||||||
for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) {
|
for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) {
|
||||||
|
@ -128,7 +129,7 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
|
|
||||||
|
|
||||||
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
||||||
logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
|
//logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
|
||||||
|
|
||||||
final String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
final String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
||||||
|
|
||||||
|
|
|
@ -58,8 +58,10 @@ public class FileUtils {
|
||||||
|
|
||||||
public static final String workingDir = System.getProperty("user.dir") + File.separator;
|
public static final String workingDir = System.getProperty("user.dir") + File.separator;
|
||||||
|
|
||||||
|
private boolean isTestEnvironment;
|
||||||
|
|
||||||
public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation) {
|
|
||||||
|
public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation, @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) {
|
||||||
if ( !baseFilesLocation.endsWith(File.separator) )
|
if ( !baseFilesLocation.endsWith(File.separator) )
|
||||||
baseFilesLocation += File.separator;
|
baseFilesLocation += File.separator;
|
||||||
|
|
||||||
|
@ -67,6 +69,8 @@ public class FileUtils {
|
||||||
baseFilesLocation = workingDir + baseFilesLocation;
|
baseFilesLocation = workingDir + baseFilesLocation;
|
||||||
|
|
||||||
this.baseFilesLocation = baseFilesLocation;
|
this.baseFilesLocation = baseFilesLocation;
|
||||||
|
|
||||||
|
this.isTestEnvironment = isTestEnvironment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -139,7 +143,7 @@ public class FileUtils {
|
||||||
|
|
||||||
SetMultimap<String, Payload> allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((urlReportsSize / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
|
SetMultimap<String, Payload> allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((urlReportsSize / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
|
||||||
|
|
||||||
final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1";
|
final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1";
|
||||||
final int[] hashArgType = new int[] {Types.VARCHAR};
|
final int[] hashArgType = new int[] {Types.VARCHAR};
|
||||||
|
|
||||||
List<Callable<Void>> callableTasks = new ArrayList<>(6);
|
List<Callable<Void>> callableTasks = new ArrayList<>(6);
|
||||||
|
@ -174,7 +178,8 @@ public class FileUtils {
|
||||||
|
|
||||||
if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded.
|
if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded.
|
||||||
payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
|
payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
|
||||||
//logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
|
||||||
numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet();
|
numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet();
|
||||||
numFullTextsFound.incrementAndGet();
|
numFullTextsFound.incrementAndGet();
|
||||||
return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
|
return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
|
||||||
|
@ -447,7 +452,6 @@ public class FileUtils {
|
||||||
logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName);
|
logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( hash == null ) {
|
if ( hash == null ) {
|
||||||
logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName);
|
logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName);
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -414,7 +414,7 @@ public class ParquetFileUtils {
|
||||||
conn.setInstanceFollowRedirects(true); // It is possible that the "location" was an intermediate one.
|
conn.setInstanceFollowRedirects(true); // It is possible that the "location" was an intermediate one.
|
||||||
conn.connect();
|
conn.connect();
|
||||||
|
|
||||||
// Write the parquet file.
|
// Upload the parquet file.
|
||||||
try ( BufferedInputStream inputS = new BufferedInputStream(Files.newInputStream(parquetFile.toPath()), FileUtils.tenMb);
|
try ( BufferedInputStream inputS = new BufferedInputStream(Files.newInputStream(parquetFile.toPath()), FileUtils.tenMb);
|
||||||
BufferedOutputStream outS = new BufferedOutputStream(conn.getOutputStream(), FileUtils.tenMb) )
|
BufferedOutputStream outS = new BufferedOutputStream(conn.getOutputStream(), FileUtils.tenMb) )
|
||||||
{
|
{
|
||||||
|
@ -604,7 +604,8 @@ public class ParquetFileUtils {
|
||||||
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
|
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false));
|
if ( logger.isTraceEnabled() )
|
||||||
|
logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("", e);
|
logger.error("", e);
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue