// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
"select pubid, url, datasourceid, datasourcename\n"+// Select the final sorted data with "assignmentsLimit".
"from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year\n"+// Select the distinct id-url data. Beware that this will return duplicate id-url paris, wince one pair may be associated with multiple datasources.
" from (select p.id as pubid, pu.url as url, pb.level as level, attempts.counts as attempt_count, p.year as pub_year, d.id as datasourceid, d.name as datasourcename\n"+// Select all needed columns frm JOINs, order by "boost.level" and limit them to (assignmentsLimit * 10)
" join "+ImpalaConnector.databaseName+".publication_urls pu on pu.id=p.id\n"+
" join "+ImpalaConnector.databaseName+".datasource d on d.id=p.datasourceid\n"+// This is needed for the "d.allow_harvest=true" check later on.
" left outer join "+ImpalaConnector.databaseName+".publication_boost pb\n"+
" on p.id=pb.id\n"+
" left outer join (select count(a.id) as counts, a.id from "+ImpalaConnector.databaseName+".attempt a group by a.id) as attempts\n"+
" on attempts.id=p.id\n"+
" left outer join (\n"+
" select a.id, a.original_url from "+ImpalaConnector.databaseName+".assignment a\n"+
" union all\n"+
" select pl.id, pl.original_url from "+ImpalaConnector.databaseName+".payload pl\n"+// Here we access the payload-VIEW which includes the three payload-tables.
" ) as existing\n"+
" on existing.id=p.id and existing.original_url=pu.url\n"+
" where d.allow_harvest=true and existing.id is null\n"+// For records not found on existing, the "existing.id" will be null.
((excludedDatasourceIDsStringList!=null)?// If we have an exclusion-list, use it below.
(" and d.id not in "+excludedDatasourceIDsStringList+"\n"):"")+
" and coalesce(attempts.counts, 0) <= "+maxAttemptsPerRecordAtomic.get()+"\n"+
" and not exists (select 1 from "+ImpalaConnector.databaseName+".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n"+
" and pu.url != '' and pu.url is not null\n"+// Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
" order by coalesce(level, -1000) desc\n"+
" limit "+(assignmentsLimit*10)+"\n"+
" ) as non_distinct_results\n"+
" order by coalesce(attempt_count, 0), coalesce(pub_year, 0) desc, reverse(pubid), url\n"+// We also order by reverse "pubid" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
" limit "+assignmentsLimit+"\n"+
") as findAssignmentsQuery";
// The datasourceID and datasourceName are currently not used during the processing in the Worker. They may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
// However, the "datasourceid" is useful to be able to generate the fileNames for the S3, without needing to perform additional select queries (with JOINs) at that phase.
// For performance reasons, we collect the returned assignments and create a temp-table with them, so that later they can be copied efficiently to the "a-bit-more-permanent" assignment table.
// This way, we avoid having to manually insert thousands of assignment records there. Instead, we create a new table AS the results from the "findAssignmentsQuery".
errorMsg="No results retrieved from the \"findAssignmentsQuery\" for worker with id: "+workerId+". Will increase the \"maxAttempts\" to "+maxAttemptsPerRecordAtomic.incrementAndGet()+" for the next requests.";
logger.error(errorMsg);
StringtmpErrMsg=dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
if(tmpErrMsg!=null){
errorMsg+="\n"+tmpErrMsg;// The additional error-msg is already logged.
logger.warn("The retrieved results were fewer ("+assignmentsSize+") than the \"assignmentsLimit\" ("+assignmentsLimit+"), for worker with id: "+workerId+". Will increase the \"maxAttempts\" to "+maxAttemptsPerRecordAtomic.incrementAndGet()+", for the next requests.");
logger.debug("Finished gathering "+assignmentsSize+" assignments for worker with id \""+workerId+"\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
// Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates.
// So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker.
// We would certainly like the query to return assignments with unique id-url pairs, but for now there is no other solution.
// There is no logic-error though, since the worker requests a "LIMIT" of 5000 not "exactly 5000" nor it matters the actual number it will process.
// So, it will ask for "up to 5000" and the Controller may return 4700.
// Since the average number of duplicates will be the same for each worker-request, none of the Workers will have any advantage over the other in the long run,
// so there will be no conflicting performance data between them.
longcurrentNumOfWorkerReportsProcessed=numOfWorkerReportsProcessed.incrementAndGet();// Increment it when it is actually being pressed, not when it arrives at the endpoint.
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_"+curReportAssignmentsCounter);
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
postReportResultToWorker(curWorkerId,curReportAssignmentsCounter,"Error when creating the HDFS sub-directories for assignments_"+curReportAssignmentsCounter);
thrownewRuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_"+curReportAssignmentsCounter);
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_"+curReportAssignmentsCounter);
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_"+curReportAssignmentsCounter);
thrownewRuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_"+curReportAssignmentsCounter);
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_"+curReportAssignmentsCounter);
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_"+curReportAssignmentsCounter);
logger.error("Error when deleting the HDFS sub-directories for assignments_"+curReportAssignmentsCounter);// A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
// For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in.
// After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches,
// there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time.
StringtmpErrMsg=dropCurrentAssignmentTable();// The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
logger.debug("Going to merge the parquet files for the tables which were altered.");
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
// This means that over time a table may have thousand of parquet files and the search through them will be very slow. Thus, we merge them every now and then.
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
logger.error("Could not find any info for worker with id: \""+workerId+"\".");
returnfalse;
}
Stringurl="http://"+workerInfo.getWorkerIP()+":1881/api/addReportResultToWorker/"+assignmentRequestCounter;// This workerIP will not be null.
if(logger.isTraceEnabled())
logger.trace("Going to \"postReportResultToWorker\": \""+workerId+"\", for assignments_"+assignmentRequestCounter+((errorMsg!=null)?"\nError: "+errorMsg:""));
logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \""+workerId+"\" and assignments_"+assignmentRequestCounter+"! Error-code was: "+responseCode);
logger.error("The Worker \""+workerId+"\" failed to handle the \"postReportResultToWorker\", of assignments_"+assignmentRequestCounter+": "+hsee.getMessage());
logger.error(errorMsg+" | The worker has probably crashed, since we received a \"Connection refused\"!");
workerInfo.setHasShutdown(true);// Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future.
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
StringBuildersb=newStringBuilder(baseInsertQuery.length()+(dataSize*6*numParamsPerRow));// TODO - If this is ever used, make it a global Thread-Local var. And then "clear" (reset) it after each use.