forked from lsmyrnaios/UrlsController
- Avoid passing some duplicate publications to the Workers, by post-processing the assignments retrieved from Impala. The assignments may contain duplicate id-url pairs, which have different datasources, since one publication may be connected to multiple datasources.
- Code polishing.
This commit is contained in:
parent
b94c35c66e
commit
fd1cf56863
|
@ -130,7 +130,7 @@ public class StatsController {
|
|||
// In case the "numTopDatasources" param is not given or is less or equal to 0, then no limit will be added to the query.
|
||||
/*
|
||||
select d.id, d.name, d.type, d.allow_harvest, count(p.id) as payload_count from datasource d
|
||||
join publication pu on pu.datasourceid=d.id
|
||||
join publication pu on pu.datasourceid=d.id -- We want the datasources with at least 1 publication.
|
||||
left join payload p on p.id=pu.id -- We want the datasources with 0 payloads too, so we use "left join".
|
||||
group by d.id, d.name, d.type, d.allow_harvest
|
||||
order by payload_count desc
|
||||
|
|
|
@ -109,34 +109,37 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
{
|
||||
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
|
||||
String findAssignmentsQuery =
|
||||
"select pubid, url, datasourceid, datasourcename\n" + // The datsourceName is currently not used. It may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
|
||||
"from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year\n" +
|
||||
" from (select p.id as pubid, p.year as pub_year, pu.url as url, pb.level as level, d.id as datasourceid, d.name as datasourcename, attempts.counts as attempt_count\n" +
|
||||
"select pubid, url, datasourceid, datasourcename\n" + // Select the final sorted data with "assignmentsLimit".
|
||||
"from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year\n" + // Select the distinct id-url data. Beware that this will return duplicate id-url paris, wince one pair may be associated with multiple datasources.
|
||||
" from (select p.id as pubid, pu.url as url, pb.level as level, attempts.counts as attempt_count, p.year as pub_year, d.id as datasourceid, d.name as datasourcename\n" + // Select all needed columns frm JOINs, order by "boost.level" and limit them to (assignmentsLimit * 10)
|
||||
" from " + ImpalaConnector.databaseName + ".publication p\n" +
|
||||
" join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
|
||||
" join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
|
||||
" left outer join " + ImpalaConnector.databaseName + ".publication_boost pb\n" +
|
||||
" on p.id=pb.id\n" +
|
||||
" left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts\n" +
|
||||
" on attempts.id=p.id\n" +
|
||||
" left outer join (\n" +
|
||||
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
|
||||
" union all\n" +
|
||||
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables.
|
||||
" ) as existing\n" +
|
||||
" on existing.id=p.id and existing.original_url=pu.url\n" +
|
||||
" where d.allow_harvest=true and existing.id is null\n" + // For records not found on existing, the "existing.id" will be null.
|
||||
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
|
||||
(" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") +
|
||||
" and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" +
|
||||
" and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
||||
" and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
|
||||
" order by coalesce(level, -1000) desc\n" +
|
||||
" limit " + (assignmentsLimit * 10) + ")\n" +
|
||||
" as non_distinct_results\n" +
|
||||
" order by coalesce(attempt_count, 0), coalesce(pub_year, 0) desc, reverse(pubid), url\n" + // We also order by "id" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
|
||||
" limit " + assignmentsLimit + ")\n" +
|
||||
"as findAssignmentsQuery";
|
||||
" join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
|
||||
" join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + // This is needed for the "d.allow_harvest=true" check later on.
|
||||
" left outer join " + ImpalaConnector.databaseName + ".publication_boost pb\n" +
|
||||
" on p.id=pb.id\n" +
|
||||
" left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts\n" +
|
||||
" on attempts.id=p.id\n" +
|
||||
" left outer join (\n" +
|
||||
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
|
||||
" union all\n" +
|
||||
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables.
|
||||
" ) as existing\n" +
|
||||
" on existing.id=p.id and existing.original_url=pu.url\n" +
|
||||
" where d.allow_harvest=true and existing.id is null\n" + // For records not found on existing, the "existing.id" will be null.
|
||||
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
|
||||
(" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") +
|
||||
" and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" +
|
||||
" and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
||||
" and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
|
||||
" order by coalesce(level, -1000) desc\n" +
|
||||
" limit " + (assignmentsLimit * 10) + "\n" +
|
||||
" ) as non_distinct_results\n" +
|
||||
" order by coalesce(attempt_count, 0), coalesce(pub_year, 0) desc, reverse(pubid), url\n" + // We also order by reverse "pubid" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
|
||||
" limit " + assignmentsLimit + "\n" +
|
||||
") as findAssignmentsQuery";
|
||||
|
||||
// The datasourceID and datasourceName are currently not used during the processing in the Worker. They may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
|
||||
// However, the "datasourceid" is useful to be able to generate the fileNames for the S3, without needing to perform additional select queries (with JOINs) at that phase.
|
||||
|
||||
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
||||
//logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
|
||||
|
@ -149,6 +152,8 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
|
||||
ImpalaConnector.databaseLock.lock();
|
||||
|
||||
// For performance reasons, we collect the returned assignments and create a temp-table with them, so that later they can be copied efficiently to the "a-bit-more-permanent" assignment table.
|
||||
// This way, we avoid having to manually insert thousands of assignment records there. Instead, we create a new table AS the results from the "findAssignmentsQuery".
|
||||
String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery);
|
||||
if ( errorMsg != null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
|
@ -240,8 +245,32 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
|
||||
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
|
||||
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
||||
// Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates.
|
||||
// So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker.
|
||||
|
||||
// We would certainly like the query to return assignments with unique id-url pairs, but for now there is no other solution.
|
||||
// There is no logic-error though, since the worker requests a "LIMIT" of 5000 not "exactly 5000" nor it matters the actual number it will process.
|
||||
// So, it will ask for "up to 5000" and the Controller may return 4700.
|
||||
// Since the average number of duplicates will be the same for each worker-request, none of the Workers will have any advantage over the other in the long run,
|
||||
// so there will be no conflicting performance data between them.
|
||||
|
||||
final HashMap<String, Assignment> uniquePairsAndAssignments = new HashMap<>((int) (assignmentsLimit * 0.9));
|
||||
|
||||
for ( Assignment assignment : assignments ) {
|
||||
uniquePairsAndAssignments.put(assignment.getId() + "_" + assignment.getOriginalUrl(), assignment);
|
||||
// This will just update the duplicate record with another "assignment object", containing a different datasource.
|
||||
}
|
||||
|
||||
List<Assignment> distinctAssignments = new ArrayList<>(uniquePairsAndAssignments.values());
|
||||
int distinctAssignmentsSize = distinctAssignments.size();
|
||||
|
||||
if ( logger.isTraceEnabled() ) {
|
||||
logger.trace("numDuplicates in returned assignments = " + (assignmentsSize - distinctAssignmentsSize));
|
||||
logger.trace("Size of \"distinctAssignments\": " + distinctAssignments.size());
|
||||
}
|
||||
|
||||
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + distinctAssignmentsSize + " assignments to worker with ID: " + workerId + ".");
|
||||
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, distinctAssignments));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -151,7 +151,7 @@ public class FileUtils {
|
|||
// The table does not exist, so it was not even half-created by the previous query.
|
||||
// Not having the original table anymore is a serious error. A manual action is needed!
|
||||
logger.error((errorMsg += "The original table \"" + tableName + "\" must be created manually! Serious problems may appear otherwise!"));
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and fix it immediately to avoid other errors in the Service)..!
|
||||
}
|
||||
// Here, the original-table exists in the DB, BUT without any data inside! This worker Report failed to handled! (some of its data could not be loaded to the database, and all previous data was lost).
|
||||
return errorMsg;
|
||||
|
|
Loading…
Reference in New Issue