forked from D-Net/dnet-hadoop
modified the mapping to include the groups. Added step to workflow to send directly to the catalogue
This commit is contained in:
parent
2d380aea1d
commit
0d10e3bd22
|
@ -260,7 +260,7 @@ public class Mapper implements Serializable {
|
|||
.orElse("");
|
||||
|
||||
if (!publisher.equals("")) {
|
||||
groups.add(publisher.toLowerCase());
|
||||
groups.add(publisher.toLowerCase().replace(".", "-"));
|
||||
externals
|
||||
.add(
|
||||
KeyValue
|
||||
|
@ -292,8 +292,8 @@ public class Mapper implements Serializable {
|
|||
hbSet.remove("Unknown Repository");
|
||||
externals.add(KeyValue.newInstance("Risis2_Publishing:Hosted By", getListOfValues(hbSet)));
|
||||
|
||||
cfSet.forEach(cf -> groups.add(cf.toLowerCase()));
|
||||
hbSet.forEach(hb -> groups.add(hb.toLowerCase()));
|
||||
cfSet.forEach(cf -> groups.add(cf.toLowerCase().replace(".", "-")));
|
||||
hbSet.forEach(hb -> groups.add(hb.toLowerCase().replace(".", "-")));
|
||||
|
||||
groups.forEach(g -> {
|
||||
if (publishers.contains(g.trim())) {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.http.HttpStatus;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
|
@ -44,7 +45,8 @@ public class SendToCatalogue implements Serializable {
|
|||
new Path(hdfsPath), true);
|
||||
GCatAPIClient gCatAPIClient = new GCatAPIClient();
|
||||
gCatAPIClient.setApplicationToken(access_token);
|
||||
gCatAPIClient.purgeAll();
|
||||
int purged = gCatAPIClient.purgeAll();
|
||||
log.info("purged: " + purged);
|
||||
while (fileStatusListIterator.hasNext()) {
|
||||
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||
|
||||
|
@ -59,7 +61,9 @@ public class SendToCatalogue implements Serializable {
|
|||
BufferedReader reader = new BufferedReader(new FileReader("/tmp/" + name));
|
||||
String line;
|
||||
while((line=reader.readLine())!= null){
|
||||
gCatAPIClient.publish(line);
|
||||
if (HttpStatus.SC_CREATED != gCatAPIClient.publish(line)){
|
||||
log.error("entry non created for item " + line);
|
||||
}
|
||||
}
|
||||
reader.close();
|
||||
log.info("deleting information for: " + name);
|
||||
|
|
Loading…
Reference in New Issue