package eu.dnetlib.openaire.project.dao; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; import java.sql.*; import java.time.Duration; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import javax.sql.DataSource; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import eu.dnetlib.DnetOpenaireExporterProperties; import eu.dnetlib.openaire.project.domain.Project; import eu.dnetlib.openaire.project.domain.db.ProjectDetails; import eu.dnetlib.openaire.project.domain.db.ProjectTsv; import org.antlr.stringtemplate.StringTemplate; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * Created by claudio on 20/09/16. */ @Component @ConditionalOnProperty(value = "openaire.exporter.enable.project", havingValue = "true") public class JdbcApiDaoImpl implements JdbcApiDao { public static final Charset UTF8 = Charset.forName("UTF-8"); private static final Log log = LogFactory.getLog(JdbcApiDaoImpl.class); @Autowired private DnetOpenaireExporterProperties config; @Autowired private DataSource dataSource; @Autowired private ProjectTsvRepository projectTsvRepository; @Override @Cacheable("fundingpath-ids") public Map readFundingpathIds() { log.debug("loading funding ids"); final String sql = "SELECT id FROM fundingpaths"; final Set ids = Sets.newHashSet(); try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = getRs(stm)) { while (rs.next()) { ids.add(rs.getString("id")); } } catch (SQLException e) { throw new RuntimeException(e); } log.debug(String.format("loaded %s funding ids", ids.size())); final Map res = Maps.newHashMap(); final Splitter sp = Splitter.on("::").trimResults(); ids.stream() .filter(s -> sp.splitToList(s).size() < 3) .forEach(s -> res.put(StringUtils.substringAfterLast(s, "::").toUpperCase(), s)); res.put("FP7", "ec__________::EC::FP7"); res.put("H2020", "ec__________::EC::H2020"); log.debug(String.format("processed %s funding ids", res.size())); res.forEach((k,v) -> log.debug(String.format("%s : '%s'", k, v))); return res; } @Override public void processProjectDetails(final OutputStream outputStream, String format, Boolean compress) throws IOException { final OutputStream out = getOutputStream(new BufferedOutputStream(outputStream), compress); try { final String sql = "SELECT * FROM project_details"; try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = getRs(stm)) { while (rs.next()) { try { switch (format) { case "csv": out.write(getProjectDetails(rs).asCSV().getBytes(UTF8)); break; case "json": out.write(getProjectDetails(rs).asJson().getBytes(UTF8)); break; } } catch (IOException e) { throw new RuntimeException(e); } } } catch (SQLException e) { throw new RuntimeException(e); } } finally { if (out instanceof GZIPOutputStream) { ((GZIPOutputStream) out).finish(); } out.close(); } } private OutputStream getOutputStream(final OutputStream outputStream, final Boolean compress) throws IOException { if (compress != null && compress) { return new GZIPOutputStream(outputStream); } return outputStream; } private ProjectDetails getProjectDetails(final ResultSet rs) throws SQLException { return new ProjectDetails() .setProjectId(rs.getString("projectid")) .setAcronym(rs.getString("acronym")) .setCode(rs.getString("code")) .setJsonextrainfo(rs.getString("jsonextrainfo")) .setFundingPath(asList(rs.getArray("fundingpath"))); } private String[] asList(final Array value) throws SQLException { if (value != null) { final List list = Arrays.asList((Object[]) value.getArray()); return list.stream() .map(o -> o != null ? o.toString() : null) .toArray(String[]::new); } return new String[0]; } @Override public void processTsvRequest(final ZipOutputStream out, final Boolean article293, final String fundingPrefix, final String filename) throws IOException { out.putNextEntry(new ZipEntry(filename)); writeTsvLine(out, Splitter.on(",").trimResults().splitToList(config.getProject().getTsvFields())); queryForTsv(fundingPrefix, article293).forEach(p -> { try { writeTsvLine(out, p.asList()); } catch (IOException e) { throw new RuntimeException(e); } }); } private void writeTsvLine(final ZipOutputStream out, final List s) throws IOException { out.write(Joiner.on('\t').useForNull("").join(s).getBytes(UTF8)); out.write('\n'); } private Iterable queryForTsv(final String fundingPrefix, final Boolean article293) { log.debug(String.format("fundingPrefix:'%s' and oa_mandate_for_datasets:'%s'", fundingPrefix, article293)); if (article293 != null) { return projectTsvRepository.findByFundingpathidStartingWithAndOaMandateForDatasetsOrderByAcronym(fundingPrefix, article293); } else { return projectTsvRepository.findByFundingpathidStartingWithOrderByAcronym(fundingPrefix); } } @Override public void streamProjects(final String sql, final OutputStream out, final String head, final StringTemplate projectTemplate, final String tail, final ValueCleaner cleaner) throws IOException, SQLException { if (log.isDebugEnabled()) { log.debug("Thread " + Thread.currentThread().getId() + " begin"); } final LocalDateTime start = LocalDateTime.now(); if (StringUtils.isNotBlank(head)) { out.write(head.getBytes(UTF8)); } try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = getRs(stm)) { while (rs.next()) { final Project p = new Project() .setFunder(cleaner.clean(rs.getString("funder"))) .setJurisdiction(cleaner.clean(rs.getString("jurisdiction"))) .setFundingpathid(cleaner.clean(rs.getString("fundingpathid"))) .setAcronym(cleaner.clean(rs.getString("acronym"))) .setTitle(cleaner.clean(rs.getString("title"))) .setCode(cleaner.clean(rs.getString("code"))) .setStartdate(cleaner.clean(rs.getString("startdate"))) .setEnddate(cleaner.clean(rs.getString("enddate"))); projectTemplate.reset(); projectTemplate.setAttribute("p", p); out.write(projectTemplate.toString().getBytes(UTF8)); } if (StringUtils.isNotBlank(tail)) { out.write(tail.getBytes(UTF8)); } final LocalDateTime end = LocalDateTime.now(); if (log.isDebugEnabled()) { log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end).toMillis() + " ms"); } } } @Override @CacheEvict(cacheNames = { "fundingpath-ids" }, allEntries = true) @Scheduled(fixedDelayString = "${openaire.exporter.cache.ttl}") public void dropCache() { log.debug("dropped fundingpath ids cache"); } private Connection getConn() throws SQLException { final Connection connection = dataSource.getConnection(); connection.setAutoCommit(false); return connection; } private PreparedStatement getStm(final String sql, final Connection con) throws SQLException { final PreparedStatement stm = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY); stm.setFetchSize(config.getJdbc().getMaxRows()); return stm; } private ResultSet getRs(final PreparedStatement stm) throws SQLException { final ResultSet rs = stm.executeQuery(); rs.setFetchSize(config.getJdbc().getMaxRows()); return rs; } }