// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
"from (select distinct p.id as pubid, pu.url as url, pb.level as level, attempts.counts as attempt_count, p.year as pub_year, d.id as datasourceid, d.name as datasourcename\n"+// Select the distinct id-url data. Beware that this will return duplicate id-url pairs, wince one pair may be associated with multiple datasources.
" from "+DatabaseConnector.databaseName+".publication p\n"+
" join "+DatabaseConnector.databaseName+".publication_urls pu on pu.id=p.id\n"+
" join "+DatabaseConnector.databaseName+".datasource d on d.id=p.datasourceid\n"+// This is needed for the "d.allow_harvest=true" check later on.
" left outer join "+DatabaseConnector.databaseName+".publication_boost pb\n"+
" on p.id=pb.id\n"+
" left outer join (select count(a.id) as counts, a.id from "+DatabaseConnector.databaseName+".attempt a group by a.id) as attempts\n"+
" on attempts.id=p.id\n"+
" left outer join (\n"+
" select a.id, a.original_url from "+DatabaseConnector.databaseName+".assignment a\n"+
" union all\n"+
" select pl.id, pl.original_url from "+DatabaseConnector.databaseName+".payload pl\n"+// Here we access the payload-VIEW which includes the three payload-tables.
" ) as existing\n"+
" on existing.id=p.id and existing.original_url=pu.url\n"+
" where d.allow_harvest=true and existing.id is null\n"+// For records not found on existing, the "existing.id" will be null.
((excludedDatasourceIDsStringList!=null)?// If we have an exclusion-list, use it below.
(" and d.id not in "+excludedDatasourceIDsStringList+"\n"):"")+
" and coalesce(attempts.counts, 0) <= "+maxAttemptsPerRecordAtomic.get()+"\n"+
" and not exists (select 1 from "+DatabaseConnector.databaseName+".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n"+
" and pu.url != '' and pu.url is not null\n"+// Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
" and (p.year <= "+currentYear+" or p.year > "+(currentYear+5)+")\n"+// Exclude the pubs which will be published in the next 5 years. They don't provide full-texts now. (We don't exclude all future pubs, since, some have invalid year, like "9999").
") as distinct_results\n"+
"order by coalesce(attempt_count, 0), coalesce(level, 0) desc, coalesce(pub_year, 0) desc, reverse(pubid), url\n"+// We also order by reverse "pubid" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
// The datasourceID and datasourceName are currently not used during the processing in the Worker. They may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
// However, the "datasourceid" is useful to be able to generate the fileNames for the S3, without needing to perform additional select queries (with JOINs) at that phase.
// For performance reasons, we collect the returned assignments and create a temp-table with them, so that later they can be copied efficiently to the "a-bit-more-permanent" assignment table.
// This way, we avoid having to manually insert thousands of assignment records there. Instead, we create a new table AS the results from the "findAssignmentsQuery".
errorMsg="No results retrieved from the \"getAssignmentsQuery\" for worker with id: "+workerId+". Will increase the \"maxAttempts\" to "+maxAttemptsPerRecordAtomic.incrementAndGet()+" for the next requests.";
logger.error(errorMsg);
if(tmpErrMsg!=null){
errorMsg+="\n"+tmpErrMsg;// The additional error-msg is already logged.
intassignmentsSize=assignments.size();// It will not be zero here! As in case of no results, the "EmptyResultDataAccessException" is thrown and handled.
logger.warn("The retrieved results were fewer ("+assignmentsSize+") than the \"assignmentsLimit\" ("+assignmentsLimit+"), for worker with id: "+workerId+". Will increase the \"maxAttempts\" to "+maxAttemptsPerRecordAtomic.incrementAndGet()+", for the next requests.");
logger.debug("Finished gathering "+assignmentsSize+" assignments for worker with id \""+workerId+"\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
// We do not need to "merge" the parquet files for the "assignment" table here, since this happens every time we delete the assignments of a specific batch.
// Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates.
// So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker.
// We would certainly like the query to return assignments with unique id-url pairs, but for now there is no other solution.
// There is no logic-error though, since the worker requests a "LIMIT" of 5000 not "exactly 5000" nor it matters the actual number it will process.
// So, it will ask for "up to 5000" and the Controller may return 4700.
// Since the average number of duplicates will be the same for each worker-request, none of the Workers will have any advantage over the other in the long run,
// so there will be no conflicting performance data between them.
logger.info("Sending batch-assignments_"+curAssignmentsBatchCounter+", with "+distinctAssignmentsSize+" assignments, to worker with ID: "+workerId+".");
longcurrentNumOfWorkerReportsProcessed=numOfWorkerReportsProcessed.incrementAndGet();// Increment it when it is actually being pressed, not when it arrives at the endpoint.
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_"+curReportAssignmentsCounter);
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
postReportResultToWorker(curWorkerId,curReportAssignmentsCounter,"Error when creating the HDFS sub-directories for assignments_"+curReportAssignmentsCounter);
thrownewRuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_"+curReportAssignmentsCounter);
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_"+curReportAssignmentsCounter);
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_"+curReportAssignmentsCounter);
thrownewRuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_"+curReportAssignmentsCounter);
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_"+curReportAssignmentsCounter);
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_"+curReportAssignmentsCounter);
logger.error("Error when deleting the HDFS sub-directories for assignments_"+curReportAssignmentsCounter);// A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
// If this method exits sooner, due tio an error, then the assignments are not deleted in order to wait for the schedulers to retry them and not be given to workers, to avoid reprocessing the urls.
// For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in.
// After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches,
// there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time.
if((currentNumOfWorkerReportsProcessed%UrlsController.numOfWorkers.get())==0)// The workersNum should not be zero! If a "division by zero" exception is thrown below, then there's a big bug somewhere in the design.
StringtmpErrMsg=dropCurrentAssignmentTable();// The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
// This means that over time a table may have thousands of parquet files and the search through them will be very slow. Thus, we merge them every now and then.
// This will delete the rows of the "assignment" table which refer to the "givenAssignmentsBatchCounter".
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER "givenAssignmentsBatchCounter" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
logger.trace("Going to \"postReportResultToWorker\": \""+workerId+"\", for assignments_"+assignmentRequestCounter+((errorMsg!=null)?"\nError: "+errorMsg:""));
logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \""+workerId+"\" and assignments_"+assignmentRequestCounter+"! Error-code was: "+responseCode);
logger.error("The Worker \""+workerId+"\" failed to handle the \"postReportResultToWorker\", of assignments_"+assignmentRequestCounter+": "+hsee.getMessage());
logger.error(errorMsg+" | The worker has probably crashed, since we received a \"Connection refused\"!");
workerInfo.setHasShutdown(true);// Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future.
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
StringBuildersb=newStringBuilder(baseInsertQuery.length()+(dataSize*6*numParamsPerRow));// TODO - If this is ever used, make it a global Thread-Local var. And then "clear" (reset) it after each use.