2022-01-30 21:14:52 +01:00
package eu.openaire.urls_controller.controllers ;
import com.google.common.collect.HashMultimap ;
2022-11-10 16:18:21 +01:00
import eu.openaire.urls_controller.configuration.ImpalaConnector ;
2022-01-30 21:14:52 +01:00
import eu.openaire.urls_controller.models.Assignment ;
import eu.openaire.urls_controller.models.Datasource ;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse ;
import eu.openaire.urls_controller.util.GenericUtils ;
2022-11-10 16:18:21 +01:00
import eu.openaire.urls_controller.util.ParquetFileUtils ;
2022-01-30 21:14:52 +01:00
import eu.openaire.urls_controller.util.TestFileUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.http.HttpStatus ;
import org.springframework.http.ResponseEntity ;
2022-11-10 16:18:21 +01:00
import org.springframework.jdbc.core.JdbcTemplate ;
2022-01-30 21:14:52 +01:00
import org.springframework.web.bind.annotation.GetMapping ;
2022-02-01 15:57:28 +01:00
import org.springframework.web.bind.annotation.RequestMapping ;
2022-02-02 19:19:46 +01:00
import org.springframework.web.bind.annotation.RequestParam ;
2022-01-30 21:14:52 +01:00
import org.springframework.web.bind.annotation.RestController ;
2022-11-10 16:18:21 +01:00
import java.io.File ;
2022-01-30 21:14:52 +01:00
import java.sql.Timestamp ;
import java.util.* ;
import java.util.concurrent.atomic.AtomicLong ;
2022-02-02 19:19:46 +01:00
2022-01-30 21:14:52 +01:00
@RestController
2022-02-01 15:57:28 +01:00
@RequestMapping ( " /test " )
2022-02-02 19:19:46 +01:00
public class TestController {
2022-01-30 21:14:52 +01:00
private static final Logger logger = LoggerFactory . getLogger ( TestController . class ) ;
2022-11-10 16:18:21 +01:00
@Autowired
private JdbcTemplate jdbcTemplate ;
@Autowired
private ParquetFileUtils parquetFileUtils ;
2022-01-30 21:14:52 +01:00
@Autowired
2022-02-02 19:19:46 +01:00
private TestFileUtils testFileUtils ;
2022-01-30 21:14:52 +01:00
2022-01-31 12:49:14 +01:00
@Value ( " ${services.pdfaggregation.controller.assignmentLimit} " )
2022-01-30 21:14:52 +01:00
private int assignmentLimit ;
2022-11-10 16:18:21 +01:00
2022-02-02 19:19:46 +01:00
private static final AtomicLong assignmentsBatchCounter = new AtomicLong ( 0 ) ;
2022-01-30 21:14:52 +01:00
2022-02-02 19:19:46 +01:00
@GetMapping ( " urls " )
2022-01-30 21:14:52 +01:00
public ResponseEntity < ? > getTestUrls ( @RequestParam String workerId , @RequestParam int workerAssignmentsLimit ) {
logger . info ( " Worker with id: \" " + workerId + " \" , requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + this . assignmentLimit ) ;
2022-02-02 19:19:46 +01:00
logger . debug ( " Going to retrieve the data from the inputResourceFile: " + testFileUtils . testResource . getFilename ( ) ) ;
2022-01-30 21:14:52 +01:00
List < Assignment > assignments = new ArrayList < > ( ) ;
HashMultimap < String , String > loadedIdUrlPairs ;
boolean isFirstRun = true ;
boolean assignmentsLimitReached = false ;
Timestamp timestamp = new Timestamp ( System . currentTimeMillis ( ) ) ; // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
2022-02-02 19:19:46 +01:00
loadedIdUrlPairs = testFileUtils . getNextIdUrlPairBatchFromJson ( ) ; // Take urls from jsonFile.
2022-01-30 21:14:52 +01:00
2022-02-02 19:19:46 +01:00
if ( testFileUtils . isFinishedLoading ( loadedIdUrlPairs . isEmpty ( ) , isFirstRun ) ) // Throws RuntimeException which is automatically passed on.
2022-01-30 21:14:52 +01:00
break ;
else
isFirstRun = false ;
Set < Map . Entry < String , String > > pairs = loadedIdUrlPairs . entries ( ) ;
for ( Map . Entry < String , String > pair : pairs ) {
if ( assignments . size ( ) > = workerAssignmentsLimit ) {
assignmentsLimitReached = true ;
break ;
}
int randomNum = GenericUtils . getRandomNumber ( 1 , 5 ) ;
assignments . add ( new Assignment ( pair . getKey ( ) , pair . getValue ( ) , new Datasource ( " ID_ " + randomNum , " NAME_ " + randomNum ) , workerId , timestamp ) ) ;
} // end pairs-for-loop
if ( assignmentsLimitReached ) {
logger . debug ( " Done loading urls from the inputFile as the assignmentsLimit ( " + workerAssignmentsLimit + " ) was reached. " ) ;
break ;
}
} // end loading-while-loop
2022-02-02 19:19:46 +01:00
Scanner scanner = testFileUtils . inputScanner . get ( ) ;
2022-01-30 21:14:52 +01:00
if ( scanner ! = null ) // Check if the initial value is null.
scanner . close ( ) ;
long curAssignmentsBatchCounter = assignmentsBatchCounter . incrementAndGet ( ) ;
2022-02-02 19:19:46 +01:00
logger . info ( " Sending batch_ " + curAssignmentsBatchCounter + " with " + assignments . size ( ) + " assignments ( " + testFileUtils . duplicateIdUrlEntries . get ( ) + " more assignments were discarded as duplicates), to worker with ID: " + workerId ) ;
2022-01-30 21:14:52 +01:00
return ResponseEntity . status ( HttpStatus . OK ) . header ( " Content-Type " , " application/json " ) . body ( new AssignmentsResponse ( curAssignmentsBatchCounter , assignments ) ) ;
}
2022-02-02 19:19:46 +01:00
2022-11-10 16:18:21 +01:00
@GetMapping ( " get10PublicationIdsTest " )
public ResponseEntity < ? > get10PublicationIdsTest ( ) {
String query = " SELECT id FROM " + ImpalaConnector . databaseName + " .publication LIMIT 10; " ;
try {
List < String > publications = jdbcTemplate . queryForList ( query , String . class ) ;
return new ResponseEntity < > ( publications . toString ( ) , HttpStatus . OK ) ;
} catch ( Exception e ) {
String errorMsg = " Problem when executing \" getAssignmentsQuery \" : " + query ;
logger . error ( errorMsg , e ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
}
@GetMapping ( " parquet_upload " )
public ResponseEntity < ? > uploadParquetFile ( ) {
logger . debug ( " We got a \" parquet_upload \" request. " ) ;
String parquetFileName = " 1_attempts_0.parquet " ;
String parquetFileFullPath = System . getProperty ( " user.dir " ) + File . separator + parquetFileName ;
String errorMsg = parquetFileUtils . uploadParquetFileToHDFS ( parquetFileFullPath , parquetFileName , parquetFileUtils . parquetHDFSDirectoryPathAttempts ) ;
if ( errorMsg ! = null ) // The error-message is already logged by the Controller.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . ok ( ) . build ( ) ;
}
2022-01-30 21:14:52 +01:00
}