- Add info about the Stats API usage in "README.md".
- Optimize performance in "ParquetFileUtils.createAndLoadParquetDataIntoAttemptTable()" and "ParquetFileUtils.createAndLoadParquetDataIntoPayloadTable()". - Handle the "EmptyResultDataAccessException" inside "StatsController". - Optimize gradle's performance. - Code polishing.
This commit is contained in:
parent
bfdf06bd09
commit
e51ee9dd27
|
@ -3,6 +3,12 @@
|
||||||
The Controller's Application receives requests coming from the [Workers](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) , constructs an assignments-list with data received from a database and returns the list to the workers.<br>
|
The Controller's Application receives requests coming from the [Workers](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) , constructs an assignments-list with data received from a database and returns the list to the workers.<br>
|
||||||
Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.<br>
|
Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.<br>
|
||||||
The database used is the [Impala](https://impala.apache.org/).<br>
|
The database used is the [Impala](https://impala.apache.org/).<br>
|
||||||
|
<br>
|
||||||
|
Statistics API:
|
||||||
|
- "**getNumberOfPayloads**" endpoint: **http://IP:PORT/api/stats/getNumberOfPayloads**
|
||||||
|
- "**getNumberOfRecordsInspected**" endpoint: **http://IP:PORT/api/stats/getNumberOfRecordsInspected**
|
||||||
|
<br>
|
||||||
|
|
||||||
<br>
|
<br>
|
||||||
To install and run the application:
|
To install and run the application:
|
||||||
- Run ```git clone``` and then ```cd UrlsController```.
|
- Run ```git clone``` and then ```cd UrlsController```.
|
||||||
|
|
|
@ -116,7 +116,7 @@ configurations.implementation {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set increased lower and upper limits for the java-execution.
|
// Set increased lower and upper limits for the java-execution.
|
||||||
tasks.withType(JavaExec) {
|
tasks.withType(JavaExec).configureEach {
|
||||||
jvmArgs = ['-Xms512m', '-Xmx8g']
|
jvmArgs = ['-Xms512m', '-Xmx8g']
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.dao.EmptyResultDataAccessException;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
@ -29,7 +30,7 @@ public class StatsController {
|
||||||
{
|
{
|
||||||
logger.info("Received a \"getNumberOfPayloads\" request.");
|
logger.info("Received a \"getNumberOfPayloads\" request.");
|
||||||
|
|
||||||
String getPayloadsNumberQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload";
|
final String getPayloadsNumberQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload";
|
||||||
try {
|
try {
|
||||||
Object result = jdbcTemplate.queryForObject(getPayloadsNumberQuery, Integer.class);
|
Object result = jdbcTemplate.queryForObject(getPayloadsNumberQuery, Integer.class);
|
||||||
if ( result != null ) {
|
if ( result != null ) {
|
||||||
|
@ -38,6 +39,8 @@ public class StatsController {
|
||||||
return new ResponseEntity<>(numOfPayloads, HttpStatus.OK);
|
return new ResponseEntity<>(numOfPayloads, HttpStatus.OK);
|
||||||
} else
|
} else
|
||||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The payloads' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getPayloadsNumberQuery: " + getPayloadsNumberQuery);
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The payloads' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getPayloadsNumberQuery: " + getPayloadsNumberQuery);
|
||||||
|
} catch (EmptyResultDataAccessException erdae) {
|
||||||
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The payloads' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getPayloadsNumberQuery: " + getPayloadsNumberQuery);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = "Problem when executing \"getPayloadsNumberQuery\": " + getPayloadsNumberQuery;
|
String errorMsg = "Problem when executing \"getPayloadsNumberQuery\": " + getPayloadsNumberQuery;
|
||||||
logger.error(errorMsg, e);
|
logger.error(errorMsg, e);
|
||||||
|
@ -51,13 +54,13 @@ public class StatsController {
|
||||||
public ResponseEntity<?> getNumberOfRecordsInspected()
|
public ResponseEntity<?> getNumberOfRecordsInspected()
|
||||||
{
|
{
|
||||||
// Note that until all the records are inspected, the "attempt" table contains all the inspected records +very few duplicates (id-url) which come from the publications-database.
|
// Note that until all the records are inspected, the "attempt" table contains all the inspected records +very few duplicates (id-url) which come from the publications-database.
|
||||||
// After all the records are inspected, it contains duplicate records of more and more id-urls, as time goes one, since for every eligible record the Service re-attmepts to get the full-text.
|
// After all the records are inspected, it contains duplicate records of more and more id-urls, as time goes one, since for every eligible record the Service re-attempts to get the full-text.
|
||||||
// So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try aganin and again some of the records.
|
// So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try again and again some records.
|
||||||
// Before all the records are inspected, this endpoint will report all the inspected records MINUS the duplicate records which come straight from the "publication" table.
|
// Before all the records are inspected, this endpoint will report all the inspected records MINUS the duplicate records which come straight from the "publication" table.
|
||||||
|
|
||||||
logger.info("Received a \"getNumberOfRecordsInspected\" request.");
|
logger.info("Received a \"getNumberOfRecordsInspected\" request.");
|
||||||
|
|
||||||
String getInspectedRecordsNumberQuery = "select count(dist.id) from (select distinct id, original_url from " + ImpalaConnector.databaseName + ".attempt) as dist";
|
final String getInspectedRecordsNumberQuery = "select count(dist.id) from (select distinct id, original_url from " + ImpalaConnector.databaseName + ".attempt) as dist";
|
||||||
try {
|
try {
|
||||||
Object result = jdbcTemplate.queryForObject(getInspectedRecordsNumberQuery, Integer.class);
|
Object result = jdbcTemplate.queryForObject(getInspectedRecordsNumberQuery, Integer.class);
|
||||||
if ( result != null ) {
|
if ( result != null ) {
|
||||||
|
@ -66,6 +69,8 @@ public class StatsController {
|
||||||
return new ResponseEntity<>(numOfInspectedRecords, HttpStatus.OK);
|
return new ResponseEntity<>(numOfInspectedRecords, HttpStatus.OK);
|
||||||
} else
|
} else
|
||||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery);
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery);
|
||||||
|
} catch (EmptyResultDataAccessException erdae) {
|
||||||
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = "Problem when executing \"getInspectedRecordsNumberQuery\": " + getInspectedRecordsNumberQuery;
|
String errorMsg = "Problem when executing \"getInspectedRecordsNumberQuery\": " + getInspectedRecordsNumberQuery;
|
||||||
logger.error(errorMsg, e);
|
logger.error(errorMsg, e);
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class UrlController {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
|
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
|
||||||
|
|
||||||
@PostMapping("addWorkerReport")
|
@PostMapping("addWorkerReport")
|
||||||
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
|
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
|
||||||
|
|
|
@ -188,8 +188,9 @@ public class ParquetFileUtils {
|
||||||
|
|
||||||
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
|
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
|
||||||
{
|
{
|
||||||
List<GenericData.Record> recordList = new ArrayList<>();
|
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
|
||||||
GenericData.Record record;
|
GenericData.Record record;
|
||||||
|
|
||||||
for ( UrlReport urlReport : urlReports ) {
|
for ( UrlReport urlReport : urlReports ) {
|
||||||
Payload payload = urlReport.getPayload();
|
Payload payload = urlReport.getPayload();
|
||||||
if ( payload == null ) {
|
if ( payload == null ) {
|
||||||
|
@ -244,7 +245,7 @@ public class ParquetFileUtils {
|
||||||
|
|
||||||
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
|
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
|
||||||
{
|
{
|
||||||
List<GenericData.Record> recordList = new ArrayList<>();
|
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
|
||||||
GenericData.Record record;
|
GenericData.Record record;
|
||||||
|
|
||||||
for ( UrlReport urlReport : urlReports )
|
for ( UrlReport urlReport : urlReports )
|
||||||
|
@ -315,6 +316,7 @@ public class ParquetFileUtils {
|
||||||
|
|
||||||
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema)
|
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema)
|
||||||
.withCompressionCodec(CompressionCodecName.GZIP).build())
|
.withCompressionCodec(CompressionCodecName.GZIP).build())
|
||||||
|
// When the app runs inside a Docker Container, it is NOT guaranteed that all compression-types will work. For example, the "SNAPPY"-compression does NOT work, while the "GZIP" works.
|
||||||
{
|
{
|
||||||
//logger.debug("Going to write to \"" + fullFilePath + "\" the record list: " + recordList); // DEBUG!
|
//logger.debug("Going to write to \"" + fullFilePath + "\" the record list: " + recordList); // DEBUG!
|
||||||
for ( GenericRecord record : recordList ) {
|
for ( GenericRecord record : recordList ) {
|
||||||
|
|
Loading…
Reference in New Issue