forked from lsmyrnaios/UrlsController
parent
f183df276b
commit
8607594f6d
|
@ -106,7 +106,7 @@ dependencies {
|
||||||
// https://mvnrepository.com/artifact/org.json/json
|
// https://mvnrepository.com/artifact/org.json/json
|
||||||
implementation 'org.json:json:20220924'
|
implementation 'org.json:json:20220924'
|
||||||
|
|
||||||
testImplementation group: 'org.springframework.security', name: 'spring-security-test'
|
testImplementation 'org.springframework.security:spring-security-test'
|
||||||
testImplementation "org.springframework.boot:spring-boot-starter-test"
|
testImplementation "org.springframework.boot:spring-boot-starter-test"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -113,32 +113,14 @@ public class UrlController {
|
||||||
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
||||||
//logger.debug(findAssignmentsQuery); // DEBUG!
|
//logger.debug(findAssignmentsQuery); // DEBUG!
|
||||||
|
|
||||||
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
|
|
||||||
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
|
|
||||||
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
||||||
|
|
||||||
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
|
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
|
||||||
|
|
||||||
ImpalaConnector.databaseLock.lock();
|
ImpalaConnector.databaseLock.lock();
|
||||||
|
|
||||||
try {
|
String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery);
|
||||||
jdbcTemplate.execute(createCurrentAssignmentsQuery);
|
if ( errorMsg != null ) {
|
||||||
} catch (Exception e) {
|
|
||||||
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
|
|
||||||
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
|
|
||||||
if ( tmpErrMsg != null )
|
|
||||||
errorMsg += "\n" + tmpErrMsg;
|
|
||||||
ImpalaConnector.databaseLock.unlock();
|
|
||||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
|
|
||||||
} catch (Exception e) {
|
|
||||||
String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
|
|
||||||
String tmpErrMsg = dropCurrentAssignmentTable();
|
|
||||||
if ( tmpErrMsg != null )
|
|
||||||
errorMsg += "\n" + tmpErrMsg;
|
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||||
}
|
}
|
||||||
|
@ -164,7 +146,7 @@ public class UrlController {
|
||||||
assignments.add(assignment);
|
assignments.add(assignment);
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
|
errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
|
||||||
String tmpErrMsg = dropCurrentAssignmentTable();
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
||||||
if ( tmpErrMsg != null )
|
if ( tmpErrMsg != null )
|
||||||
errorMsg += "\n" + tmpErrMsg;
|
errorMsg += "\n" + tmpErrMsg;
|
||||||
|
@ -174,7 +156,7 @@ public class UrlController {
|
||||||
|
|
||||||
int assignmentsSize = assignments.size();
|
int assignmentsSize = assignments.size();
|
||||||
if ( assignmentsSize == 0 ) {
|
if ( assignmentsSize == 0 ) {
|
||||||
String errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
|
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
|
||||||
logger.error(errorMsg);
|
logger.error(errorMsg);
|
||||||
String tmpErrMsg = dropCurrentAssignmentTable();
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
|
@ -196,7 +178,7 @@ public class UrlController {
|
||||||
try {
|
try {
|
||||||
jdbcTemplate.execute(insertAssignmentsQuery);
|
jdbcTemplate.execute(insertAssignmentsQuery);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
|
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
|
||||||
String tmpErrMsg = dropCurrentAssignmentTable();
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
||||||
if ( tmpErrMsg != null )
|
if ( tmpErrMsg != null )
|
||||||
errorMsg += "\n" + tmpErrMsg;
|
errorMsg += "\n" + tmpErrMsg;
|
||||||
|
@ -204,7 +186,7 @@ public class UrlController {
|
||||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
String errorMsg = dropCurrentAssignmentTable();
|
errorMsg = dropCurrentAssignmentTable();
|
||||||
if ( errorMsg != null ) {
|
if ( errorMsg != null ) {
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||||
|
@ -300,9 +282,6 @@ public class UrlController {
|
||||||
|
|
||||||
// Load all the parquet files of each type into its table.
|
// Load all the parquet files of each type into its table.
|
||||||
ImpalaConnector.databaseLock.lock();
|
ImpalaConnector.databaseLock.lock();
|
||||||
// Important note: It may be possible for a thread to acquire the lock and load its own and another thread's data into the table.
|
|
||||||
// So when the other thread acquire the lock, it will load ZERO data.
|
|
||||||
// That's ok, and we do not need to add any check if the remote data exist, since this process happens in milliseconds. (so a few milliseconds will be wasted for no data)
|
|
||||||
errorMsgAttempts = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
|
errorMsgAttempts = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
|
||||||
errorMsgPayloads = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
|
errorMsgPayloads = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
|
@ -351,7 +330,7 @@ public class UrlController {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
||||||
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
||||||
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
|
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
|
||||||
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
|
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
|
||||||
|
@ -370,6 +349,34 @@ public class UrlController {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
|
||||||
|
{
|
||||||
|
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
|
||||||
|
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
|
||||||
|
|
||||||
|
try {
|
||||||
|
jdbcTemplate.execute(createCurrentAssignmentsQuery);
|
||||||
|
} catch (Exception e) {
|
||||||
|
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
|
||||||
|
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
|
||||||
|
if ( tmpErrMsg != null )
|
||||||
|
errorMsg += "\n" + tmpErrMsg;
|
||||||
|
return errorMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
|
||||||
|
} catch (Exception e) {
|
||||||
|
String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
|
||||||
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
||||||
|
if ( tmpErrMsg != null )
|
||||||
|
errorMsg += "\n" + tmpErrMsg;
|
||||||
|
return errorMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null; // All good.
|
||||||
|
}
|
||||||
|
|
||||||
private String dropCurrentAssignmentTable() {
|
private String dropCurrentAssignmentTable() {
|
||||||
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE";
|
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE";
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
package eu.openaire.urls_controller.util;
|
package eu.openaire.urls_controller.util;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -15,11 +13,10 @@ import java.util.zip.ZipInputStream;
|
||||||
@Component
|
@Component
|
||||||
public class FileUnZipper {
|
public class FileUnZipper {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(FileUnZipper.class);
|
|
||||||
|
|
||||||
public void unzipFolder(Path source, Path target) throws Exception {
|
public void unzipFolder(Path source, Path target) throws Exception {
|
||||||
try ( ZipInputStream zis = new ZipInputStream(Files.newInputStream(source.toFile().toPath())) ) {
|
try ( ZipInputStream zis = new ZipInputStream(Files.newInputStream(source.toFile().toPath())) ) {
|
||||||
// Iterate over the files in zip and un-zip them.
|
// Iterate over the files in zip and unzip them.
|
||||||
ZipEntry zipEntry = zis.getNextEntry();
|
ZipEntry zipEntry = zis.getNextEntry();
|
||||||
while ( zipEntry != null ) {
|
while ( zipEntry != null ) {
|
||||||
Path targetPath = zipSlipProtect(zipEntry, target);
|
Path targetPath = zipSlipProtect(zipEntry, target);
|
||||||
|
|
|
@ -114,7 +114,7 @@ public class FileUtils {
|
||||||
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1" ;
|
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1" ;
|
||||||
final int[] hashArgType = new int[] {Types.VARCHAR};
|
final int[] hashArgType = new int[] {Types.VARCHAR};
|
||||||
|
|
||||||
ImpalaConnector.databaseLock.lock();
|
ImpalaConnector.databaseLock.lock(); // The following loop uses the database.
|
||||||
|
|
||||||
for ( UrlReport urlReport : urlReports )
|
for ( UrlReport urlReport : urlReports )
|
||||||
{
|
{
|
||||||
|
@ -230,7 +230,7 @@ public class FileUtils {
|
||||||
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
|
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
|
||||||
|
|
||||||
String[] fileNames = new File(targetDirectory).list();
|
String[] fileNames = new File(targetDirectory).list();
|
||||||
if ( (fileNames == null) || (fileNames.length <= 1) ) { // The directory might have only one file, the "zip-file".
|
if ( (fileNames == null) || (fileNames.length <= 1) ) { // The directory might have only one file, the "zip-file", if the full-texts failed to be unzipped..
|
||||||
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
|
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
|
||||||
failedBatches ++;
|
failedBatches ++;
|
||||||
continue; // To the next batch.
|
continue; // To the next batch.
|
||||||
|
@ -489,13 +489,17 @@ public class FileUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public boolean deleteDirectory(File curBatchDir) {
|
public boolean deleteDirectory(File curBatchDir)
|
||||||
|
{
|
||||||
try {
|
try {
|
||||||
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
|
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
|
||||||
return true;
|
return true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e);
|
logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e);
|
||||||
return false;
|
return false;
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
logger.error("This batch-dir does not exist: " + curBatchDir.getName());
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue