- Add a missing change.

- Code optimization and polishing.
- Update dependencies.
This commit is contained in:
Lampros Smyrnaios 2023-10-04 16:17:12 +03:00
parent 7019f7c3c7
commit 96c11ba4b8
9 changed files with 75 additions and 45 deletions

View File

@ -52,9 +52,9 @@ dependencies {
implementation("org.apache.commons:commons-compress:1.24.0") { implementation("org.apache.commons:commons-compress:1.24.0") {
exclude group: 'com.github.luben', module: 'zstd-jni' exclude group: 'com.github.luben', module: 'zstd-jni'
} }
implementation 'com.github.luben:zstd-jni:1.5.5-5' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often. implementation 'com.github.luben:zstd-jni:1.5.5-6' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often.
implementation 'io.minio:minio:8.5.5' implementation 'io.minio:minio:8.5.6'
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc // https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation("com.cloudera.impala:jdbc:2.5.31") { implementation("com.cloudera.impala:jdbc:2.5.31") {

View File

@ -111,7 +111,7 @@ public class ScheduledTasks {
// Add check for the value, if wanted.. (we don't care at the moment) // Add check for the value, if wanted.. (we don't care at the moment)
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage); logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + GenericUtils.endOfLine + stackTraceMessage);
} catch (CancellationException ce) { } catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage()); logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage());
} catch (IndexOutOfBoundsException ioobe) { } catch (IndexOutOfBoundsException ioobe) {
@ -375,13 +375,13 @@ public class ScheduledTasks {
private boolean processFailedWorkerReport(File workerReportFile, String workerReportName) private boolean processFailedWorkerReport(File workerReportFile, String workerReportName)
{ {
logger.debug("Going to load and parse the workerReport: " + workerReportName); logger.debug("Going to load and parse the failed workerReport: " + workerReportName);
// Load the file's json content into a "WorkerReport" object. // Load the file's json content into a "WorkerReport" object.
try ( BufferedReader bfRead = new BufferedReader(new FileReader(workerReportFile), FileUtils.halfMb) ) { try ( BufferedReader bfRead = new BufferedReader(new FileReader(workerReportFile), FileUtils.halfMb) ) {
String line; String line;
while ( (line = bfRead.readLine()) != null ) // The line, without any line-termination-characters. while ( (line = bfRead.readLine()) != null ) // The line, without any line-termination-characters.
jsonStringBuilder.append(line).append("\n"); jsonStringBuilder.append(line).append(GenericUtils.endOfLine);
} catch (Exception e) { } catch (Exception e) {
logger.error("Problem when acquiring the contents of workerReport \"" + workerReportName + "\""); logger.error("Problem when acquiring the contents of workerReport \"" + workerReportName + "\"");
jsonStringBuilder.setLength(0); // Reset the StringBuilder without de-allocating. jsonStringBuilder.setLength(0); // Reset the StringBuilder without de-allocating.

View File

@ -57,15 +57,15 @@ public class BulkImportController {
public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport) public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport)
{ {
String bulkImportReportLocation1; String bulkImportReportLocationTemp;
this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation(); this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation();
this.bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources()); this.bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources());
bulkImportReportLocation1 = bulkImport.getBulkImportReportLocation(); bulkImportReportLocationTemp = bulkImport.getBulkImportReportLocation();
if ( !bulkImportReportLocation1.endsWith("/") ) if ( !bulkImportReportLocationTemp.endsWith("/") )
bulkImportReportLocation1 += "/"; bulkImportReportLocationTemp += "/";
this.bulkImportReportLocation = bulkImportReportLocation1; this.bulkImportReportLocation = bulkImportReportLocationTemp;
this.bulkImportService = bulkImportService; this.bulkImportService = bulkImportService;
@ -90,8 +90,8 @@ public class BulkImportController {
@PostMapping("bulkImportFullTexts") @PostMapping("bulkImportFullTexts")
public ResponseEntity<?> bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish) { public ResponseEntity<?> bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish)
{
BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance); BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance);
if ( bulkImportSource == null ) { if ( bulkImportSource == null ) {
String errorMsg = "The provided provenance \"" + provenance + "\" is not in the list of the bulk-imported sources, so no configuration-rules are available!"; String errorMsg = "The provided provenance \"" + provenance + "\" is not in the list of the bulk-imported sources, so no configuration-rules are available!";
@ -228,7 +228,7 @@ public class BulkImportController {
FileUtils.fileAccessLock.lock(); FileUtils.fileAccessLock.lock();
try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.twentyFiveKb) ) { try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.twentyFiveKb) ) {
while ( (line = in.readLine()) != null ) while ( (line = in.readLine()) != null )
stringBuilder.append(line).append("\n"); // The "readLine()" does not return the line-term char. stringBuilder.append(line).append(GenericUtils.endOfLine); // The "readLine()" does not return the line-term char.
} catch (NoSuchFileException nsfe) { } catch (NoSuchFileException nsfe) {
logger.warn("The requested report-file with ID: \"" + bulkImportReportId + "\" was not found!"); logger.warn("The requested report-file with ID: \"" + bulkImportReportId + "\" was not found!");
return ResponseEntity.notFound().build(); return ResponseEntity.notFound().build();

View File

@ -60,7 +60,7 @@ public class ShutdownController {
// A scheduler monitors the shutdown of the workers. Once all worker have shutdown, the Controller shuts down as well. // A scheduler monitors the shutdown of the workers. Once all worker have shutdown, the Controller shuts down as well.
} }
return ResponseEntity.ok().body(endingMsg + "\n"); return ResponseEntity.ok().body(endingMsg + GenericUtils.endOfLine);
} }
@ -87,7 +87,7 @@ public class ShutdownController {
logger.warn("Will not post CancelShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown."); logger.warn("Will not post CancelShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown.");
} }
return ResponseEntity.ok().body(endingMsg + "\n"); return ResponseEntity.ok().body(endingMsg + GenericUtils.endOfLine);
} }
@ -98,7 +98,7 @@ public class ShutdownController {
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) { if ( workerInfo == null ) {
String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service!"; String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service!";
logger.warn(initMsg + "\n" + errorMsg); logger.warn(initMsg + GenericUtils.endOfLine + errorMsg);
return ResponseEntity.badRequest().body(errorMsg); return ResponseEntity.badRequest().body(errorMsg);
} }

View File

@ -103,7 +103,7 @@ public class BulkImportServiceImpl implements BulkImportService {
} }
if ( logger.isTraceEnabled() ) if ( logger.isTraceEnabled() )
logger.trace("fileLocations: " + additionalLoggingMsg + "\n" + fileLocations); logger.trace("fileLocations: " + additionalLoggingMsg + GenericUtils.endOfLine + fileLocations);
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/". String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
try { try {
@ -164,7 +164,7 @@ public class BulkImportServiceImpl implements BulkImportService {
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory. // The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + additionalLoggingMsg + "\n" + stackTraceMessage); logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + additionalLoggingMsg + GenericUtils.endOfLine + stackTraceMessage);
} catch (CancellationException ce) { } catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage() + additionalLoggingMsg); logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage() + additionalLoggingMsg);
} catch (IndexOutOfBoundsException ioobe) { } catch (IndexOutOfBoundsException ioobe) {

View File

@ -6,6 +6,7 @@ import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils; import eu.openaire.urls_controller.util.ParquetFileUtils;
import io.micrometer.core.annotation.Timed; import io.micrometer.core.annotation.Timed;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -128,8 +129,8 @@ public class UrlsServiceImpl implements UrlsService {
" on existing.original_url=pu.url\n" + " on existing.original_url=pu.url\n" +
" where d.allow_harvest=true\n" + " where d.allow_harvest=true\n" +
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below. ((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
(" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") + (" and d.id not in " + excludedDatasourceIDsStringList + GenericUtils.endOfLine) : "") +
" and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" + " and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + GenericUtils.endOfLine +
" and not exists (select 1 from " + DatabaseConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + " and not exists (select 1 from " + DatabaseConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
" and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing. " and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
" and (p.year <= " + currentYear + " or p.year > " + (currentYear + 5) + ")\n" + // Exclude the pubs which will be published in the next 5 years. They don't provide full-texts now. (We don't exclude all future pubs, since, some have invalid year, like "9999"). " and (p.year <= " + currentYear + " or p.year > " + (currentYear + 5) + ")\n" + // Exclude the pubs which will be published in the next 5 years. They don't provide full-texts now. (We don't exclude all future pubs, since, some have invalid year, like "9999").
@ -169,7 +170,7 @@ public class UrlsServiceImpl implements UrlsService {
assignment.setAssignmentsBatchCounter(curAssignmentsBatchCounter); assignment.setAssignmentsBatchCounter(curAssignmentsBatchCounter);
assignment.setTimestamp(timestamp); assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource(); Datasource datasource = new Datasource();
try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1 try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1.
assignment.setId(rs.getString(1)); assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2)); assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3)); datasource.setId(rs.getString(3));
@ -186,7 +187,7 @@ public class UrlsServiceImpl implements UrlsService {
errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."; errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
logger.error(errorMsg); logger.error(errorMsg);
if ( tmpErrMsg != null ) { if ( tmpErrMsg != null ) {
errorMsg += "\n" + tmpErrMsg; // The additional error-msg is already logged. errorMsg += GenericUtils.endOfLine + tmpErrMsg; // The additional error-msg is already logged.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} else } else
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(new AssignmentsResponse((long) -1, null)); return ResponseEntity.status(HttpStatus.NO_CONTENT).body(new AssignmentsResponse((long) -1, null));
@ -195,7 +196,7 @@ public class UrlsServiceImpl implements UrlsService {
DatabaseConnector.databaseLock.unlock(); DatabaseConnector.databaseLock.unlock();
errorMsg = DatabaseConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e); errorMsg = DatabaseConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
if ( tmpErrMsg != null ) if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg; errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
@ -205,7 +206,7 @@ public class UrlsServiceImpl implements UrlsService {
DatabaseConnector.databaseLock.unlock(); DatabaseConnector.databaseLock.unlock();
errorMsg = "Some results were retrieved from the \"getAssignmentsQuery\", but no data could be extracted from them, for worker with id: " + workerId; errorMsg = "Some results were retrieved from the \"getAssignmentsQuery\", but no data could be extracted from them, for worker with id: " + workerId;
if ( tmpErrMsg != null ) if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg; errorMsg += GenericUtils.endOfLine + tmpErrMsg;
logger.error(errorMsg); logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
@ -225,7 +226,7 @@ public class UrlsServiceImpl implements UrlsService {
DatabaseConnector.databaseLock.unlock(); DatabaseConnector.databaseLock.unlock();
errorMsg = DatabaseConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e); errorMsg = DatabaseConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
if ( tmpErrMsg != null ) if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg; errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
@ -440,7 +441,7 @@ public class UrlsServiceImpl implements UrlsService {
String errorMsg = DatabaseConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e); String errorMsg = DatabaseConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened). String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg != null ) if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg; errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return errorMsg; return errorMsg;
} }
@ -450,7 +451,7 @@ public class UrlsServiceImpl implements UrlsService {
String errorMsg = DatabaseConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e); String errorMsg = DatabaseConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable(); String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null ) if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg; errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return errorMsg; return errorMsg;
} }
@ -546,7 +547,7 @@ public class UrlsServiceImpl implements UrlsService {
logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage()); logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false; return false;
} catch (HttpClientErrorException hcee) { } catch (HttpClientErrorException hcee) {
logger.error("The Controller did something wrong when sending the report result to the Worker (" + workerId + "). | url: " + url + "\n" + hcee.getMessage()); logger.error("The Controller did something wrong when sending the report result to the Worker (" + workerId + "). | url: " + url + GenericUtils.endOfLine + hcee.getMessage());
return false; return false;
} catch (Exception e) { } catch (Exception e) {
errorMsg = "Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId; errorMsg = "Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId;
@ -578,7 +579,7 @@ public class UrlsServiceImpl implements UrlsService {
} }
// Only if the file exists, proceed to renaming it. // Only if the file exists, proceed to renaming it.
renamedWorkerReport = new File(workerReportBaseName + ((errorMsg == null) ? "_successful.json" : "_failed.json")); renamedWorkerReport = new File(workerReportBaseName + ((errorMsg == null) ? "_successful.json" : "_failed.json"));
if ( !workerReport.renameTo(renamedWorkerReport) ) { if ( ! workerReport.renameTo(renamedWorkerReport) ) {
logger.warn("There was a problem when renaming the workerReport: " + workerReport.getName()); logger.warn("There was a problem when renaming the workerReport: " + workerReport.getName());
return null; return null;
} }

View File

@ -10,6 +10,7 @@ import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport; import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo; import eu.openaire.urls_controller.models.WorkerInfo;
import org.apache.commons.io.FileDeleteStrategy; import org.apache.commons.io.FileDeleteStrategy;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -352,7 +353,7 @@ public class FileUtils {
curBatchPath = Files.createDirectories(Paths.get(targetDirectory)); curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// The base-directory will be created along with the first batch-directory. // The base-directory will be created along with the first batch-directory.
} catch (Exception e) { } catch (Exception e) {
logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++; failedBatches ++;
continue; continue;
} }
@ -433,7 +434,7 @@ public class FileUtils {
uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads); uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads);
return true; return true;
} catch (Exception e) { } catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
return false; return false;
} finally { } finally {
deleteDirectory(curBatchPath.toFile()); deleteDirectory(curBatchPath.toFile());
@ -450,6 +451,26 @@ public class FileUtils {
HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection(); HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET"); conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController"); conn.setRequestProperty("User-Agent", "UrlsController");
// TODO - Write the fileNames in the RequestBody, so that we can include as many as we want in each request.
// Right now, we can include only up to 70-80 fileNames in the url-string.
// TODO - We need to add the fileNames in the requestBody BEFORE we connect. So we will need to refactor the code to work in that order.
/*OutputStream os = conn.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8);
osw.write(fileNamesForCurBatch_separated_by_comma);
osw.flush();
osw.close();
os.close();*/
// TODO - The above update will also enable is to improve edge-case management like making sure we do not create a whole new batch for just a few files..
// Check this case for example, where we have one extra batch with the network, compression-decompression, transfer, uploading ect overhead:
// 2023-02-06 12:17:26.893 [http-nio-1880-exec-8] DEBUG e.o.urls_controller.util.FileUtils.getAndUploadFullTexts(@235) - The assignments_12 have 211 distinct non-already-uploaded fullTexts.
// Going to request them from the Worker "worker_X", in 4 batches (70 files each, except for the final batch, which will have 1 files).
// If we are not limited by the url-length we can easily say that if less than 10 files remain for the last batch, then add them to the previous batch (eg. the last batch will have 79 files)
// If equal to 10 or more files remain, then we will make an extra batch.
conn.connect(); conn.connect();
int statusCode = conn.getResponseCode(); int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) { // Invalid HTTP-Response. if ( statusCode == -1 ) { // Invalid HTTP-Response.
@ -466,7 +487,7 @@ public class FileUtils {
throw re; throw re;
} catch (Exception e) { } catch (Exception e) {
String exMessage = e.getMessage(); String exMessage = e.getMessage();
logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + exMessage); logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + GenericUtils.endOfLine + exMessage);
if ( exMessage.contains("Connection refused") ) { if ( exMessage.contains("Connection refused") ) {
logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!");
throw new RuntimeException(); throw new RuntimeException();
@ -600,7 +621,8 @@ public class FileUtils {
} }
public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError) { public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError)
{
final StringBuilder msgStrB = new StringBuilder(500); final StringBuilder msgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) { try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) {
String inputLine; String inputLine;
@ -620,7 +642,8 @@ public class FileUtils {
} }
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch) { private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch); int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch); int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small. if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
@ -631,14 +654,15 @@ public class FileUtils {
try { try {
fileNamesOfCurBatch.add(allFileNames.get(i)); fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) { } catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + "\n" + ioobe.getMessage(), ioobe); logger.error("IOOBE for i=" + i + GenericUtils.endOfLine + ioobe.getMessage(), ioobe);
} }
} }
return fileNamesOfCurBatch; return fileNamesOfCurBatch;
} }
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch) { private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl); sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size(); int numFullTextsCurBatch = fileNamesForCurBatch.size();
@ -679,7 +703,8 @@ public class FileUtils {
* @param urlReports * @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles * @param shouldCheckAndKeepS3UploadedFiles
*/ */
public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles) { public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles)
{
for ( UrlReport urlReport : urlReports ) { for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload(); Payload payload = urlReport.getPayload();
if ( payload == null ) if ( payload == null )

View File

@ -6,8 +6,12 @@ import java.util.Date;
public class GenericUtils { public class GenericUtils {
public static final String endOfLine = "\n";
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z"); private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z");
public static String getReadableCurrentTimeAndZone() { public static String getReadableCurrentTimeAndZone() {
return (simpleDateFormat.format(new Date(System.currentTimeMillis()))); return (simpleDateFormat.format(new Date(System.currentTimeMillis())));
} }
@ -23,10 +27,10 @@ public class GenericUtils {
StackTraceElement[] stels = thr.getStackTrace(); StackTraceElement[] stels = thr.getStackTrace();
StringBuilder sb = new StringBuilder(numOfLines *100); StringBuilder sb = new StringBuilder(numOfLines *100);
if ( initialMessage != null ) if ( initialMessage != null )
sb.append(initialMessage).append(" Stacktrace:").append("\n"); // This StringBuilder is thread-safe as a local-variable. sb.append(initialMessage).append(" Stacktrace:").append(GenericUtils.endOfLine); // This StringBuilder is thread-safe as a local-variable.
for ( int i = 0; (i < stels.length) && (i <= numOfLines); ++i ) { for ( int i = 0; (i < stels.length) && (i <= numOfLines); ++i ) {
sb.append(stels[i]); sb.append(stels[i]);
if (i < numOfLines) sb.append("\n"); if (i < numOfLines) sb.append(GenericUtils.endOfLine);
} }
return sb.toString(); return sb.toString();
} }

View File

@ -240,7 +240,7 @@ public class ParquetFileUtils {
for ( UrlReport urlReport : urlReports ) { for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload(); Payload payload = urlReport.getPayload();
if ( payload == null ) { if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport); logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + GenericUtils.endOfLine + urlReport);
continue; continue;
} }
@ -289,7 +289,7 @@ public class ParquetFileUtils {
} }
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath, String parquetHDFSDirectoryPathPayloads) public boolean createAndLoadParquetDataIntoPayloadTable(int payloadsCounter, List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath, String parquetHDFSDirectoryPathPayloads)
{ {
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2)); List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
GenericData.Record record; GenericData.Record record;
@ -298,7 +298,7 @@ public class ParquetFileUtils {
{ {
Payload payload = urlReport.getPayload(); Payload payload = urlReport.getPayload();
if ( payload == null ) { if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport); logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + GenericUtils.endOfLine + urlReport);
continue; continue;
} }
@ -331,7 +331,7 @@ public class ParquetFileUtils {
return false; return false;
} }
String fileName = curReportAssignmentsCounter + "_payloads.parquet"; String fileName = curReportAssignmentsCounter + "_payloads_" + payloadsCounter + ".parquet";
//logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG! //logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = localParquetPath + fileName; String fullFilePath = localParquetPath + fileName;
@ -374,7 +374,7 @@ public class ParquetFileUtils {
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING! } catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
String errorMsg = "Problem when creating the \"ParquetWriter\" object or when writing the records with it!"; String errorMsg = "Problem when creating the \"ParquetWriter\" object or when writing the records with it!";
if ( e instanceof org.apache.hadoop.fs.FileAlreadyExistsException ) if ( e instanceof org.apache.hadoop.fs.FileAlreadyExistsException )
logger.error(errorMsg + "\n" + e.getMessage()); logger.error(errorMsg + GenericUtils.endOfLine + e.getMessage());
else else
logger.error(errorMsg, e); logger.error(errorMsg, e);
@ -645,7 +645,7 @@ public class ParquetFileUtils {
return false; return false;
} }
if ( logger.isTraceEnabled() ) if ( logger.isTraceEnabled() )
logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false)); logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + GenericUtils.endOfLine + fileUtils.getMessageFromResponseBody(conn, false));
} catch (Exception e) { } catch (Exception e) {
logger.error("", e); logger.error("", e);
return false; return false;