forked from lsmyrnaios/UrlsController
- Fix double active "@Scheduled" annotation for the "ScheduledTasks.updatePrometheusMetrics()" method.
- Code polishing.
This commit is contained in:
parent
8dfb58ee63
commit
b94c35c66e
|
@ -7,7 +7,7 @@ plugins {
|
||||||
java {
|
java {
|
||||||
group = 'eu.openaire.urls_controller'
|
group = 'eu.openaire.urls_controller'
|
||||||
version = '2.1.0-SNAPSHOT'
|
version = '2.1.0-SNAPSHOT'
|
||||||
sourceCompatibility = '1.8'
|
sourceCompatibility = JavaVersion.VERSION_1_8
|
||||||
}
|
}
|
||||||
|
|
||||||
repositories {
|
repositories {
|
||||||
|
|
|
@ -178,39 +178,39 @@ public class ScheduledTasks {
|
||||||
// Scheduled Metrics for Prometheus.
|
// Scheduled Metrics for Prometheus.
|
||||||
// Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics.
|
// Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics.
|
||||||
|
|
||||||
@Scheduled(fixedDelay = 21_600_000) // Every 6 hours run thw following queries to the database and register the metric.
|
@Scheduled(fixedDelay = 21_600_000) // Every 6 hours run the following queries to the database and register the metric.
|
||||||
@Scheduled(initialDelay = 60_000, fixedDelay = 1_200_000) // For general testing only.
|
//@Scheduled(initialDelay = 60_000, fixedDelay = 1_200_000) // For general testing only.
|
||||||
//@Scheduled(initialDelay = 60_000, fixedDelay = 120_000) // For debug testing only.
|
//@Scheduled(initialDelay = 60_000, fixedDelay = 120_000) // For debug testing only.
|
||||||
public void updatePrometheusMetrics()
|
public void updatePrometheusMetrics()
|
||||||
{
|
{
|
||||||
ResponseEntity<?> responseEntity = statsController.getNumberOfAllPayloads(true);
|
ResponseEntity<?> responseEntity = statsController.getNumberOfAllPayloads(true);
|
||||||
if ( responseEntity.getStatusCode().value() == 200 ) {
|
if ( responseEntity.getStatusCode().value() == 200 ) {
|
||||||
numOfAllPayloads.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
|
numOfAllPayloads.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
|
||||||
} // Any error is already logged.
|
} // Any error is already logged.
|
||||||
|
|
||||||
responseEntity = statsController.getNumberOfPayloadsAggregatedByServiceThroughCrawling(true);
|
responseEntity = statsController.getNumberOfPayloadsAggregatedByServiceThroughCrawling(true);
|
||||||
if ( responseEntity.getStatusCode().value() == 200 ) {
|
if ( responseEntity.getStatusCode().value() == 200 ) {
|
||||||
numOfPayloadsAggregatedByServiceThroughCrawling.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
|
numOfPayloadsAggregatedByServiceThroughCrawling.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
|
||||||
} // Any error is already logged.
|
} // Any error is already logged.
|
||||||
|
|
||||||
responseEntity = statsController.getNumberOfPayloadsAggregatedByServiceThroughBulkImport(true);
|
responseEntity = statsController.getNumberOfPayloadsAggregatedByServiceThroughBulkImport(true);
|
||||||
if ( responseEntity.getStatusCode().value() == 200 ) {
|
if ( responseEntity.getStatusCode().value() == 200 ) {
|
||||||
numOfPayloadsAggregatedByServiceThroughBulkImport.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
|
numOfPayloadsAggregatedByServiceThroughBulkImport.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
|
||||||
} // Any error is already logged.
|
} // Any error is already logged.
|
||||||
|
|
||||||
responseEntity = statsController.getNumberOfPayloadsAggregatedByService(true);
|
responseEntity = statsController.getNumberOfPayloadsAggregatedByService(true);
|
||||||
if ( responseEntity.getStatusCode().value() == 200 ) {
|
if ( responseEntity.getStatusCode().value() == 200 ) {
|
||||||
numOfPayloadsAggregatedByService.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
|
numOfPayloadsAggregatedByService.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
|
||||||
} // Any error is already logged.
|
} // Any error is already logged.
|
||||||
|
|
||||||
responseEntity = statsController.getNumberOfLegacyPayloads(true);
|
responseEntity = statsController.getNumberOfLegacyPayloads(true);
|
||||||
if ( responseEntity.getStatusCode().value() == 200 ) {
|
if ( responseEntity.getStatusCode().value() == 200 ) {
|
||||||
numOfLegacyPayloads.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
|
numOfLegacyPayloads.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
|
||||||
} // Any error is already logged.
|
} // Any error is already logged.
|
||||||
|
|
||||||
responseEntity = statsController.getNumberOfRecordsInspectedByServiceThroughCrawling(true);
|
responseEntity = statsController.getNumberOfRecordsInspectedByServiceThroughCrawling(true);
|
||||||
if ( responseEntity.getStatusCode().value() == 200 ) {
|
if ( responseEntity.getStatusCode().value() == 200 ) {
|
||||||
numOfRecordsInspectedByServiceThroughCrawling.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
|
numOfRecordsInspectedByServiceThroughCrawling.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
|
||||||
} // Any error is already logged.
|
} // Any error is already logged.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ import com.google.common.collect.Lists;
|
||||||
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
||||||
import eu.openaire.urls_controller.models.Error;
|
import eu.openaire.urls_controller.models.Error;
|
||||||
import eu.openaire.urls_controller.models.*;
|
import eu.openaire.urls_controller.models.*;
|
||||||
import eu.openaire.urls_controller.services.UrlsServiceImpl;
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
@ -488,6 +487,7 @@ public class ParquetFileUtils {
|
||||||
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory.
|
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory.
|
||||||
|
|
||||||
logger.info("Going to check if the remote parquet directories exist.");
|
logger.info("Going to check if the remote parquet directories exist.");
|
||||||
|
String initErrMsg = "The remote parquet HDFS-directory ";
|
||||||
|
|
||||||
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
|
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
|
||||||
|
|
||||||
|
@ -512,7 +512,7 @@ public class ParquetFileUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( statusCode == 404 ) {
|
if ( statusCode == 404 ) {
|
||||||
logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
|
logger.info(initErrMsg + "\"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
|
||||||
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
||||||
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
||||||
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
||||||
|
@ -559,9 +559,9 @@ public class ParquetFileUtils {
|
||||||
else if ( dirPath.equals("payloads_bulk_import") )
|
else if ( dirPath.equals("payloads_bulk_import") )
|
||||||
foundPayloadsBulkImportDir = true;
|
foundPayloadsBulkImportDir = true;
|
||||||
else
|
else
|
||||||
logger.warn("Unknown remote parquet directory found: " + dirPath);
|
logger.warn("Unknown remote parquet HDFS-directory found: " + dirPath);
|
||||||
}
|
}
|
||||||
} catch ( JSONException je ) { // In case any of the above "json-keys" was not found.
|
} catch (JSONException je) { // In case any of the above "json-keys" was not found.
|
||||||
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
|
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -571,22 +571,22 @@ public class ParquetFileUtils {
|
||||||
|
|
||||||
// For each missing subdirectories, run the mkDirs-request.
|
// For each missing subdirectories, run the mkDirs-request.
|
||||||
if ( !foundAttemptsDir ) {
|
if ( !foundAttemptsDir ) {
|
||||||
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
|
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
|
||||||
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
||||||
} else
|
} else
|
||||||
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
|
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
|
||||||
|
|
||||||
if ( !foundPayloadsAggregatedDir ) {
|
if ( !foundPayloadsAggregatedDir ) {
|
||||||
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
|
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
|
||||||
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
||||||
} else
|
} else
|
||||||
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
|
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
|
||||||
|
|
||||||
if ( !foundPayloadsBulkImportDir ) {
|
if ( !foundPayloadsBulkImportDir ) {
|
||||||
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
|
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
|
||||||
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
||||||
} else
|
} else
|
||||||
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
|
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
|
||||||
|
|
||||||
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
|
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
|
||||||
// We need all directories to be created in order for the app to function properly!
|
// We need all directories to be created in order for the app to function properly!
|
||||||
|
|
Loading…
Reference in New Issue