- Optimize the "insertAssignmentsQuery".

- Add documentation about the Prometheus Metrics, in README.
- Update Dependencies.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-07-05 17:10:30 +03:00
parent a89abe3f2f
commit e8644cb64f
8 changed files with 34 additions and 19 deletions

View File

@ -27,7 +27,7 @@ For interacting with the database we use [**Impala**](https://impala.apache.org/
- "**getNumberOfPayloadsAggregatedByService**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfPayloadsAggregatedByService** <br> - "**getNumberOfPayloadsAggregatedByService**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfPayloadsAggregatedByService** <br>
This endpoint returns the number of payloads aggregated by the PDF-Aggregated-Service itself, both through crawling and bulk-import procedures. This endpoint returns the number of payloads aggregated by the PDF-Aggregated-Service itself, both through crawling and bulk-import procedures.
- "**getNumberOfLegacyPayloads**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfLegacyPayloads** <br> - "**getNumberOfLegacyPayloads**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfLegacyPayloads** <br>
This endpoint returns the number of payloads which were aggregated by methods other thant the PDF Aggregation Service. This endpoint returns the number of payloads which were aggregated by methods other than the PDF Aggregation Service.
- "**getNumberOfPayloadsForDatasource**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfPayloadsForDatasource?datasourceId=\<givenDatasourceId\>** <br> - "**getNumberOfPayloadsForDatasource**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfPayloadsForDatasource?datasourceId=\<givenDatasourceId\>** <br>
This endpoint returns the number of payloads which belong to the datasource specified by the given datasourceID. This endpoint returns the number of payloads which belong to the datasource specified by the given datasourceID.
- "**getNumberOfRecordsInspectedByServiceThroughCrawling**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfRecordsInspectedByServiceThroughCrawling** <br> - "**getNumberOfRecordsInspectedByServiceThroughCrawling**" endpoint: **http://\<IP\>:\<PORT\>/api/stats/getNumberOfRecordsInspectedByServiceThroughCrawling** <br>
@ -47,6 +47,19 @@ Note: The Shutdown Service API is accessible by the Controller's host machine.
<br> <br>
<br> <br>
**Prometheus Metrics**:
- "**numOfAllPayloads**"
- "**numOfPayloadsAggregatedByServiceThroughCrawling**"
- "**numOfPayloadsAggregatedByServiceThroughBulkImport**"
- "**numOfPayloadsAggregatedByService**"
- "**numOfLegacyPayloads**"
- "**numOfRecordsInspectedByServiceThroughCrawling**"
- "**getAssignments_time_seconds_max**": Time taken to return the assignments.
- "**addWorkerReport_time_seconds**": Time taken to add the WorkerReport.
<br>
<br>
**To install and run the application**: **To install and run the application**:
- Run ```git clone``` and then ```cd UrlsController```. - Run ```git clone``` and then ```cd UrlsController```.
- Set the preferable values inside the [__application.yml__](https://code-repo.d4science.org/lsmyrnaios/UrlsController/src/branch/master/src/main/resources/application.yml) file. - Set the preferable values inside the [__application.yml__](https://code-repo.d4science.org/lsmyrnaios/UrlsController/src/branch/master/src/main/resources/application.yml) file.

View File

@ -1,12 +1,14 @@
plugins { plugins {
id 'org.springframework.boot' version '2.7.12' id 'org.springframework.boot' version '2.7.13'
id 'io.spring.dependency-management' version '1.1.0' id 'io.spring.dependency-management' version '1.1.0'
id 'java' id 'java'
} }
group = 'eu.openaire.urls_controller' java {
version = '2.1.0-SNAPSHOT' group = 'eu.openaire.urls_controller'
sourceCompatibility = '1.8' version = '2.1.0-SNAPSHOT'
sourceCompatibility = '1.8'
}
repositories { repositories {
mavenCentral() mavenCentral()
@ -41,7 +43,7 @@ dependencies {
//implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2' //implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2'
// https://mvnrepository.com/artifact/com.google.guava/guava // https://mvnrepository.com/artifact/com.google.guava/guava
implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' implementation group: 'com.google.guava', name: 'guava', version: '32.1.1-jre'
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 // https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
@ -49,7 +51,7 @@ dependencies {
// https://mvnrepository.com/artifact/org.apache.commons/commons-compress // https://mvnrepository.com/artifact/org.apache.commons/commons-compress
implementation 'org.apache.commons:commons-compress:1.23.0' implementation 'org.apache.commons:commons-compress:1.23.0'
implementation 'io.minio:minio:8.5.3' implementation 'io.minio:minio:8.5.4'
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc // https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation("com.cloudera.impala:jdbc:2.5.31") { implementation("com.cloudera.impala:jdbc:2.5.31") {
@ -75,7 +77,7 @@ dependencies {
implementation('org.apache.parquet:parquet-avro:1.13.1') implementation('org.apache.parquet:parquet-avro:1.13.1')
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common // https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common
implementation('org.apache.hadoop:hadoop-common:3.3.5') { implementation('org.apache.hadoop:hadoop-common:3.3.6') {
exclude group: 'org.apache.parquet', module: 'parquet-avro' exclude group: 'org.apache.parquet', module: 'parquet-avro'
exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-api' exclude group: 'org.slf4j', module: 'slf4j-api'
@ -91,7 +93,7 @@ dependencies {
} }
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core // https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core
implementation('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.5') { implementation('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.6') {
exclude group: 'org.apache.parquet', module: 'parquet-avro' exclude group: 'org.apache.parquet', module: 'parquet-avro'
exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-api' exclude group: 'org.slf4j', module: 'slf4j-api'
@ -108,7 +110,7 @@ dependencies {
implementation 'com.fasterxml.woodstox:woodstox-core:6.5.1' implementation 'com.fasterxml.woodstox:woodstox-core:6.5.1'
// https://mvnrepository.com/artifact/org.json/json // https://mvnrepository.com/artifact/org.json/json
implementation 'org.json:json:20230227' implementation 'org.json:json:20230618'
// https://mvnrepository.com/artifact/com.google.code.gson/gson // https://mvnrepository.com/artifact/com.google.code.gson/gson
implementation 'com.google.code.gson:gson:2.10.1' implementation 'com.google.code.gson:gson:2.10.1'

View File

@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
networkTimeout=10000 networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

View File

@ -26,7 +26,7 @@ if [[ justInstall -eq 1 && shouldRunInDocker -eq 1 ]]; then
justInstall=0 justInstall=0
fi fi
gradleVersion="8.1.1" gradleVersion="8.2"
if [[ justInstall -eq 0 ]]; then if [[ justInstall -eq 0 ]]; then

View File

@ -55,7 +55,6 @@ public class ScheduledTasks {
workerReportsDirPath += "/"; workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later. this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
this.statsController = statsController; this.statsController = statsController;
registry.gauge("numOfAllPayloads", numOfAllPayloads); registry.gauge("numOfAllPayloads", numOfAllPayloads);
@ -180,7 +179,8 @@ public class ScheduledTasks {
// Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics. // Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics.
@Scheduled(fixedDelay = 21_600_000) // Every 6 hours run thw following queries to the database and register the metric. @Scheduled(fixedDelay = 21_600_000) // Every 6 hours run thw following queries to the database and register the metric.
//@Scheduled(initialDelay = 60_000, fixedDelay = 120_000) // For testing only. @Scheduled(initialDelay = 60_000, fixedDelay = 1_200_000) // For general testing only.
//@Scheduled(initialDelay = 60_000, fixedDelay = 120_000) // For debug testing only.
public void updatePrometheusMetrics() public void updatePrometheusMetrics()
{ {
ResponseEntity<?> responseEntity = statsController.getNumberOfAllPayloads(true); ResponseEntity<?> responseEntity = statsController.getNumberOfAllPayloads(true);

View File

@ -162,7 +162,7 @@ public class UrlsController {
try { try {
Files.createDirectories(currentWorkerReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException" Files.createDirectories(currentWorkerReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch (Exception e) { } catch (Exception e) {
String errorMsg = "Could nor create the \"currentWorkerReportLocationDir\" for worker \"" + curWorkerId + "\" : " + currentWorkerReportLocationDir; String errorMsg = "Could not create the \"currentWorkerReportLocationDir\" for worker \"" + curWorkerId + "\" : " + currentWorkerReportLocationDir;
logger.error(errorMsg, e); logger.error(errorMsg, e);
return ResponseEntity.internalServerError().body(errorMsg); return ResponseEntity.internalServerError().body(errorMsg);
} }

View File

@ -202,13 +202,13 @@ public class UrlsServiceImpl implements UrlsService {
} else } else
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body(new AssignmentsResponse((long) -1, null)); return ResponseEntity.status(HttpStatus.MULTI_STATUS).body(new AssignmentsResponse((long) -1, null));
} else if ( assignmentsSize < assignmentsLimit ) } else if ( assignmentsSize < assignmentsLimit )
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."); logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + ", for the next requests.");
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table. // Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', " + timestampMillis + "\n" String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + timestampMillis
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data"; + "\nfrom " + ImpalaConnector.databaseName + ".current_assignment";
try { try {
jdbcTemplate.execute(insertAssignmentsQuery); jdbcTemplate.execute(insertAssignmentsQuery);

View File

@ -703,7 +703,7 @@ public class FileUtils {
} }
Lock fileWriteLock = new ReentrantLock(true); private static final Lock fileWriteLock = new ReentrantLock(true);
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads) public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
{ {