115 lines
6.6 KiB
Java
115 lines
6.6 KiB
Java
package eu.openaire.pdf_aggregation_statistics.services;
|
|
|
|
import eu.openaire.pdf_aggregation_statistics.components.SchedulingTasks;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.dao.EmptyResultDataAccessException;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import java.sql.SQLException;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
@Service
|
|
public class StatsServiceImpl implements StatsService {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(StatsServiceImpl.class);
|
|
|
|
@Autowired
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
@Value("${database-name}")
|
|
private String databaseName;
|
|
|
|
// No DB-lock is required for these READ-operations.
|
|
|
|
public static final ConcurrentHashMap<String, Integer> datasourcesWithNumOfPayloads = new ConcurrentHashMap<>(105_000); // The number of datasources is around 104_500 at the moment.
|
|
|
|
|
|
private static final int maxRetries = 10;
|
|
|
|
|
|
public boolean gatherNumberOfPayloadsPerDatasource(int retryCount)
|
|
{
|
|
if ( retryCount > maxRetries ) {
|
|
logger.error("Could not find the requested payload-type table in an non-merging state, after " + (retryCount -1) + " retries!");
|
|
return false;
|
|
}
|
|
|
|
final String getNumberOfPayloadsPerDatasourceQuery =
|
|
"select d.id, count(p.id) as payload_count from " + databaseName + ".datasource d\n" +
|
|
" join " + databaseName + ".publication pu on pu.datasourceid=d.id\n" +
|
|
" left join " + databaseName + ".payload p on p.id=pu.id\n" + // We want the datasources with 0 payloads too, so we use "left join"
|
|
" group by d.id"; // The group-by is needed.
|
|
|
|
if ( logger.isTraceEnabled() )
|
|
logger.trace("getNumberOfPayloadsPerDatasourceQuery:\n" + getNumberOfPayloadsPerDatasourceQuery);
|
|
|
|
logger.info("Going to " + (SchedulingTasks.runningFirstTime ? "populate" : "update") + " the \"datasourcesWithNumOfPayloads\" map." + ((retryCount > 0) ? (" Retry " + retryCount + " out of " + maxRetries + ".") : ""));
|
|
final int[] countUpdatedDatasources = {0};
|
|
final int[] countNewDatasources = {0};
|
|
final int[] countAddedPayloads = {0};
|
|
try {
|
|
jdbcTemplate.query(getNumberOfPayloadsPerDatasourceQuery, rs -> {
|
|
try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1
|
|
int newValue = rs.getInt(2);
|
|
Integer oldValue = datasourcesWithNumOfPayloads.put(rs.getString(1), newValue); // Updates the number for an existing datasourceId or adds a new mapping for a new datasourceId.
|
|
if ( !SchedulingTasks.runningFirstTime ) {
|
|
if ( oldValue == null ) {
|
|
countNewDatasources[0]++;
|
|
countAddedPayloads[0] += newValue;
|
|
} else {
|
|
int diff = newValue - oldValue;
|
|
if ( diff != 0 ) { // The diff may be positive for added payloads or negative if a deduplication or any other fix was performed and the number was reduced.
|
|
countUpdatedDatasources[0]++;
|
|
countAddedPayloads[0] += diff;
|
|
// A negative diff wil result in substituting that number from the total. So we may have 3000 added payloads and 100 removed, so the final "newPayloads" will be 2900.
|
|
}
|
|
}
|
|
}
|
|
} catch (SQLException sqle) {
|
|
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
|
|
}
|
|
});
|
|
logger.info("The \"datasourcesWithNumOfPayloads\" map was " + (SchedulingTasks.runningFirstTime ? "populated with the payload-numbers for " + datasourcesWithNumOfPayloads.size() + " datasources." : ("updated. " + countNewDatasources[0] + " new datasources were added and " + countUpdatedDatasources[0] + " were updated. The number of added payloads is " + countAddedPayloads[0] + ".")));
|
|
return true;
|
|
} catch (EmptyResultDataAccessException erdae) {
|
|
logger.error("The number of payloads per datasource could not be retrieved from the database \"" + databaseName + "\" using the getNumberOfPayloadsPerDatasourceQuery: " + getNumberOfPayloadsPerDatasourceQuery);
|
|
return false;
|
|
} catch (Exception e) {
|
|
String exMsg = e.getMessage();
|
|
if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) {
|
|
logger.info("The tables are probably in a merging state. Will wait 2 minutes and try again.");
|
|
sleep2mins(); // The tables may be under merging at the moment, so sleep a bit and try again.
|
|
return gatherNumberOfPayloadsPerDatasource(++retryCount);
|
|
} else {
|
|
// If such an unknown error appears during initialization, it is fatal but not something that is so remarkable to completely avoid deploying the app to save time..
|
|
// We allow for 1 retry, 2 Minutes later. If the error appears again then the app will shutdown.
|
|
logger.error("Problem when executing \"getNumberOfPayloadsPerDatasourceQuery\": " + getNumberOfPayloadsPerDatasourceQuery, e);
|
|
if ( retryCount == 0 ) {
|
|
sleep2mins(); // The DB may have some failure
|
|
return gatherNumberOfPayloadsPerDatasource(++retryCount);
|
|
} else // Already 1 retry happened and failed for the unknown error.
|
|
return false; // If the 1st retry for the unknown error failed, then do not try again.
|
|
// When this method returns, the app will either shut down if it is during initialization or it will ignore it and retry in 6 hours.
|
|
}
|
|
}
|
|
}
|
|
|
|
// To get the human-friendly timestamp format from the BigInt in the database:
|
|
// select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload
|
|
|
|
|
|
private void sleep2mins() {
|
|
try {
|
|
Thread.sleep(120_000); // Sleep for 2 mins.
|
|
} catch (InterruptedException ie) {
|
|
logger.warn("Sleeping was interrupted!");
|
|
}
|
|
}
|
|
|
|
}
|