Changeset 5034
- Timestamp:
- 04/24/14 09:59:17 (10 years ago)
- Location:
- SRUAggregator/trunk
- Files:
-
- 57 added
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
SRUAggregator/trunk/pom.xml
r4106 r5034 12 12 </properties> 13 13 <packaging>war</packaging> 14 <name>Aggregator </name>14 <name>Aggregator-g</name> 15 15 <description>The Aggregator Project</description> 16 16 <organization> … … 32 32 <repository> 33 33 <id>sardine-google-svn-repo</id> 34 <snapshots> <enabled>true</enabled> </snapshots> 34 <snapshots> 35 <enabled>true</enabled> 36 </snapshots> 35 37 <name>Sardine maven repo at Google Code</name> 36 38 <url>http://sardine.googlecode.com/svn/maven/</url> … … 71 73 <groupId>com.sun.jersey</groupId> 72 74 <artifactId>jersey-client</artifactId> 75 <version>1.17.1</version> 76 </dependency> 77 <dependency> 78 <groupId>com.sun.jersey</groupId> 79 <artifactId>jersey-servlet</artifactId> 73 80 <version>1.17.1</version> 74 81 </dependency> … … 119 126 <version>2.2</version> 120 127 </dependency> 128 129 <dependency> 130 <groupId>junit</groupId> 131 <artifactId>junit</artifactId> 132 <version>4.10</version> 133 <scope>test</scope> 134 <type>jar</type> 135 </dependency> 136 137 <dependency> 138 <groupId>org.mockito</groupId> 139 <artifactId>mockito-all</artifactId> 140 <version>1.9.5</version> 141 <scope>test</scope> 142 </dependency> 143 121 144 </dependencies> 122 145 -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/SearchOptions.java
r4069 r5034 7 7 import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryForTesting; 8 8 import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryI; 9 import eu.clarin.sru.fcs.aggregator.sopt.CorporaScanCache;10 9 import eu.clarin.sru.fcs.aggregator.sopt.Corpus; 11 10 import eu.clarin.sru.fcs.aggregator.sopt.CorpusByInstitutionComparator; … … 13 12 import eu.clarin.sru.fcs.aggregator.sopt.CorpusByNameComparator; 14 13 import eu.clarin.sru.fcs.aggregator.sopt.CorpusByNameDComparator; 15 import eu.clarin.sru.fcs.aggregator.sopt.CorpusCache;16 14 import eu.clarin.sru.fcs.aggregator.sopt.CorpusModelCached; 17 15 import eu.clarin.sru.fcs.aggregator.sopt.CorpusModelI; … … 21 19 import eu.clarin.sru.fcs.aggregator.sopt.CorpusRendererLive; 22 20 import eu.clarin.sru.fcs.aggregator.sopt.Languages; 21 import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI; 23 22 import java.lang.reflect.Type; 24 23 import java.util.ArrayList; 25 import java.util.Collection;26 24 import java.util.Collections; 27 25 import java.util.HashSet; … … 80 78 private SRUVersion version = SRUVersion.VERSION_1_2; 81 79 82 private CorporaScanCachecache;80 private ScanCacheI cache; 83 81 84 82 @Override … … 87 85 setUpSRUVersion(); 88 86 setUpAggerationContext(); 89 cache = ( CorporaScanCache) Executions.getCurrent().getDesktop().getWebApp().getAttribute(WebAppListener.CORPUS_CACHE);87 cache = (ScanCacheI) Executions.getCurrent().getDesktop().getWebApp().getAttribute(WebAppListener.CORPUS_CACHE); 90 88 //if (cache.isEmpty()) { 91 89 // liveMode = true; -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/WebAppListener.java
r4069 r5034 1 1 package eu.clarin.sru.fcs.aggregator.app; 2 2 3 import eu.clarin.sru.client.SRUClientException; 3 import eu.clarin.sru.fcs.aggregator.cache.ScanCrawlerRunnable; 4 import eu.clarin.sru.fcs.aggregator.cache.ScanCrawler; 5 import eu.clarin.sru.fcs.aggregator.cache.ScanCacheFiled; 6 import eu.clarin.sru.fcs.aggregator.cache.ScanCache; 4 7 import eu.clarin.sru.client.SRUThreadedClient; 5 8 import eu.clarin.sru.client.fcs.ClarinFCSRecordParser; 6 import eu.clarin.sru.fcs.aggregator.sopt.C orporaScanCache;7 import eu.clarin.sru.fcs.aggregator.sopt.C orpusCache;9 import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryI; 10 import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryLive; 8 11 import eu.clarin.sru.fcs.aggregator.sopt.Languages; 12 import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI; 13 import java.io.File; 9 14 import java.io.IOException; 10 15 import java.io.InputStream; … … 13 18 import java.util.Set; 14 19 import java.util.Timer; 20 import java.util.concurrent.ExecutorService; 21 import java.util.concurrent.Executors; 22 import java.util.concurrent.ScheduledExecutorService; 23 import java.util.concurrent.TimeUnit; 15 24 import java.util.logging.Level; 16 25 import java.util.logging.Logger; … … 22 31 23 32 /** 24 * Application initialization and clean up: only one SRU threaded client is used 33 * Application initialization and clean up: only one SRU threaded client is used 25 34 * in the application, it has to be shut down when the application stops. One 26 35 * Languages object instance is used within the application. … … 34 43 public static final String LANGUAGES = "LANG"; 35 44 public static final String CORPUS_CACHE = "CORPUS_CACHE"; 36 37 45 private static final Logger LOGGER = Logger.getLogger(WebAppListener.class.getName()); 38 private static final int HOURS_BETWEEN_CACHE_UPDATE = 3; 39 40 private Timer cacheTimer; 41 46 //private static final int HOURS_BETWEEN_CACHE_UPDATE = 3; 47 //private Timer cacheTimer; 42 48 public static final String DE_TOK_MODEL = "/tokenizer/de-tuebadz-8.0-token.bin"; 49 private static final String AGGREGATOR_DIR_NAME = "aggregator"; 50 private static final String SCAN_DIR_NAME = "scan"; 51 private static final TimeUnit CACHE_UPDATE_INTERVAL_UNIT = TimeUnit.HOURS; 52 private static final int CACHE_UPDATE_INTERVAL = 5; 53 private static final int CACHE_MAX_DEPTH = 3; 54 private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 55 public static final String CORPUS_CRAWLER = "CORPUS_CRAWLER"; 43 56 44 57 @Override 45 58 public void init(WebApp webapp) throws Exception { 59 46 60 LOGGER.info("Aggregator is starting."); 61 47 62 Set<SearchResults> activeControllers = new HashSet<SearchResults>(); 48 63 webapp.setAttribute(ACTIVE_SEARCH_CONTROLLERS, activeControllers); 49 SRUThreadedClient searchClient = new SRUThreadedClient();50 searchClient.registerRecordParser(new ClarinFCSRecordParser());51 webapp.setAttribute(WebAppListener.SHARED_SRU_CLIENT, searchClient);52 64 65 SRUThreadedClient sruClient = new SRUThreadedClient(); 66 sruClient.registerRecordParser(new ClarinFCSRecordParser()); 67 webapp.setAttribute(WebAppListener.SHARED_SRU_CLIENT, sruClient); 68 53 69 Languages languages = new Languages(); 54 70 webapp.setAttribute(LANGUAGES, languages); 55 71 56 // set up timer to run the cache corpora scan info task 57 //cacheTimer = new Timer(); 58 //CorpusCache cache = new CorpusCache(); 59 //webapp.setAttribute(CORPUS_CACHE, cache); 72 setUpScanCache(webapp); 60 73 61 //DateTime date = new DateTime(); 62 //date = date.withHourOfDay(1); 63 //date = date.withMinuteOfHour(0); 64 //date = date.withSecondOfMinute(0); 65 //if (date.isBeforeNow()) { 66 // date = date.plusDays(1); 74 setUpTokenizers(webapp); 75 76 } 77 78 @Override 79 public void cleanup(WebApp webapp) throws Exception { 80 LOGGER.info("Aggregator is shutting down."); 81 Set<SearchResults> activeControllers = (Set<SearchResults>) webapp.getAttribute(ACTIVE_SEARCH_CONTROLLERS); 82 for (SearchResults activeController : activeControllers) { 83 activeController.shutdown(); 84 } 85 SRUThreadedClient searchClient = (SRUThreadedClient) webapp.getAttribute(WebAppListener.SHARED_SRU_CLIENT); 86 shutdownAndAwaitTermination(searchClient); 87 shutdownAndAwaitTermination(scheduler); 88 //cacheTimer.cancel(); 89 } 90 91 private String getScanDirectory() { 92 //File aggregatorDir = new File(System.getProperty("user.home"), "/." + AGGREGATOR_DIR_NAME); 93 File aggregatorDir = new File("/var/www", "/." + AGGREGATOR_DIR_NAME); 94 95 if (!aggregatorDir.exists()) { 96 aggregatorDir.mkdir(); 97 } 98 File scanDir = new File(aggregatorDir, SCAN_DIR_NAME); 99 if (!scanDir.exists()) { 100 scanDir.mkdir(); 101 } 102 String scanPath = scanDir.getAbsolutePath(); 103 LOGGER.info("Scan location: " + scanPath); 104 return scanPath; 105 } 106 107 private void setUpScanCache(WebApp webapp) { 108 109 ScanCacheFiled scanCacheFiled = new ScanCacheFiled(getScanDirectory()); 110 CenterRegistryI centerRegistry = new CenterRegistryLive(); 111 SRUThreadedClient sruScanClient = (SRUThreadedClient) webapp.getAttribute(WebAppListener.SHARED_SRU_CLIENT); 112 //EndpointUrlFilter filter = new EndpointUrlFilter(); 113 //filter.urlShouldContainAnyOf("leipzig", ".mpi.nl"); 114 //filter.urlShouldContainAnyOf("uni-tuebingen.de", ".mpi.nl"); 115 //filter.urlShouldContainAnyOf("dspin.dwds.de", "lindat."); 116 //ScanCrawler scanCrawler = new ScanCrawler(centerRegistry, sruScanClient, filter, maxDepth); 117 ScanCrawler scanCrawler = new ScanCrawler(centerRegistry, sruScanClient, null, CACHE_MAX_DEPTH); 118 ScanCacheI scanCache; 119 120 //synchronized (scanCrawler) { 121 LOGGER.info("Start cache read"); 122 try { 123 scanCache = scanCacheFiled.read(); 124 LOGGER.info("Finished cache read, number of root corpora: " + scanCache.getRootCorpora().size()); 125 } catch (Exception e) { 126 LOGGER.log(Level.SEVERE, "Error while reading the scan cache!", e); 127 scanCache = new ScanCache(); 128 } 67 129 //} 68 //LOGGER.info(date.toLocalTime().toString() + " " + date.toLocalTime().toString()); 69 //cacheTimer.scheduleAtFixedRate(new CacheCorporaScanTask(cache, searchClient), date.toDate(), HOURS_BETWEEN_CACHE_UPDATE * 3600000); 70 71 // read cache from file 72 CorporaScanCache cache = new CorporaScanCache(webapp.getRealPath("scan") + "/"); 73 webapp.setAttribute(CORPUS_CACHE, cache); 74 130 webapp.setAttribute(CORPUS_CACHE, scanCache); 131 webapp.setAttribute(CORPUS_CRAWLER, scanCrawler); 132 133 scheduler.scheduleAtFixedRate( 134 new ScanCrawlerRunnable(scanCrawler, scanCacheFiled, webapp), 135 0, CACHE_UPDATE_INTERVAL, CACHE_UPDATE_INTERVAL_UNIT); 136 137 } 138 139 private void shutdownAndAwaitTermination(SRUThreadedClient sruClient) { 140 // with shutdown() there are memory leaks when web app stops even if all requests have been processed; 141 // with shutdownNow() there are memory leaks when web app stops only if not all requests have been processed 142 //searchClient.shutdown(); 143 //searchClient.shutdownNow(); 144 try { 145 sruClient.shutdown(); // Disable new tasks from being submitted 146 // Wait 10 secs for existing tasks to terminate 147 // replace with awaitTermination if ever provided in SRUClient API 148 Thread.sleep(10000); 149 sruClient.shutdownNow(); // Cancel currently executing tasks 150 // Wait 10 secs for tasks to respond to being cancelled 151 // replace with awaitTermination if ever provided in SRUClient API 152 Thread.sleep(10000); 153 } catch (InterruptedException ie) { 154 // (Re-)Cancel if current thread also interrupted 155 sruClient.shutdownNow(); 156 // Preserve interrupt status 157 Thread.currentThread().interrupt(); 158 } 159 } 160 161 private void shutdownAndAwaitTermination(ExecutorService pool) { 162 pool.shutdown(); // Disable new tasks from being submitted 163 try { 164 // Wait a while for existing tasks to terminate 165 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 166 pool.shutdownNow(); // Cancel currently executing tasks 167 // Wait a while for tasks to respond to being cancelled 168 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 169 LOGGER.info("Pool did not terminate"); 170 } 171 } 172 } catch (InterruptedException ie) { 173 // (Re-)Cancel if current thread also interrupted 174 pool.shutdownNow(); 175 // Preserve interrupt status 176 Thread.currentThread().interrupt(); 177 } 178 } 179 180 private void setUpTokenizers(WebApp webapp) { 75 181 TokenizerModel model = null; 76 182 try { … … 83 189 webapp.setAttribute(DE_TOK_MODEL, model); 84 190 } 85 86 @Override87 public void cleanup(WebApp webapp) throws Exception {88 LOGGER.info("Aggregator is shutting down.");89 Set<SearchResults> activeControllers = (Set<SearchResults>) webapp.getAttribute(ACTIVE_SEARCH_CONTROLLERS);90 for (SearchResults activeController : activeControllers) {91 activeController.shutdown();92 }93 SRUThreadedClient searchClient = (SRUThreadedClient) webapp.getAttribute(WebAppListener.SHARED_SRU_CLIENT);94 // with shutdown() there are memory leaks when web app stops even if all requests have been processed;95 // with shutdownNow() there are memory leaks when web app stops only if not all requests have been processed96 searchClient.shutdownNow();97 //cacheTimer.cancel();98 }99 191 } -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/CorporaScanCache.java
r3059 r5034 1 1 package eu.clarin.sru.fcs.aggregator.sopt; 2 2 3 import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI; 3 4 import eu.clarin.sru.fcs.aggregator.app.CacheCorporaScanIntoFileTask; 4 5 import java.io.BufferedInputStream; … … 25 26 * @author Yana Panchenko 26 27 */ 27 public class CorporaScanCache { 28 @Deprecated 29 public class CorporaScanCache implements ScanCacheI { 28 30 29 31 private Map<String, List<Corpus>> enpUrlToRootCorpora = new LinkedHashMap<String, List<Corpus>>(30); … … 45 47 46 48 public CorporaScanCache(String corporaScanDir) { 47 createCache(corporaScanDir);49 readCache(corporaScanDir); 48 50 } 49 51 … … 63 65 return rootCorpora; 64 66 } 65 66 public List<Corpus> getRootCorpora(String lang) { 67 68 69 70 @Override 71 public List<Corpus> getRootCorporaOfEndpoint(String enpointUrl) { 72 return this.enpUrlToRootCorpora.get(enpointUrl); 73 } 74 75 public List<Corpus> getRootCorporaForLang(String lang) { 67 76 List<Corpus> rootCorpora = new ArrayList<Corpus>(enpUrlToRootCorpora.size()); 68 77 for (List<Corpus> corpora : this.enpUrlToRootCorpora.values()) { … … 76 85 } 77 86 87 @Override 78 88 public Set<String> getLanguages() { 79 89 Set<String> languages = new HashSet<String>(this.langToCorpora.size()); … … 82 92 } 83 93 94 @Override 84 95 public List<Corpus> getChildren(Corpus corpus) { 85 96 List<Corpus> corpora = this.corpusToChildren.get(corpus); … … 98 109 } 99 110 100 private void createCache(String corporaScanDir) {111 private void readCache(String corporaScanDir) { 101 112 File sruInstitutionsFile = new File(corporaScanDir + "inst.txt"); 102 113 BufferedReader reader = null; … … 292 303 System.out.println(cache); 293 304 } 305 306 @Override 307 public Map<String, Set<Corpus>> getRootCorporaForLang() { 308 throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. 309 } 310 311 @Override 312 public Map<String, Set<Corpus>> getTopUniqueLangToCorpora() { 313 throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. 314 } 315 316 @Override 317 public List<Corpus> getTopUniqueLanguageCorpora(String lang) { 318 throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. 319 } 320 294 321 } -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/CorpusModelCached.java
r3059 r5034 1 1 package eu.clarin.sru.fcs.aggregator.sopt; 2 2 3 import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI; 3 4 import java.util.ArrayList; 4 5 import java.util.Arrays; … … 21 22 private static final Logger logger = Logger.getLogger(CorpusModelCached.class.getName()); 22 23 private Map<String, Set<Corpus>> selectedCorpora = new HashMap<String, Set<Corpus>>(); 23 private CorporaScanCachecache;24 private ScanCacheI cache; 24 25 25 public CorpusModelCached( CorporaScanCachecache) {26 public CorpusModelCached(ScanCacheI cache) { 26 27 super(new DefaultTreeNode(new Corpus(), new ArrayList<DefaultTreeNode<Corpus>>())); 27 28 this.cache = cache; -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/Endpoint.java
r3044 r5034 36 36 return url; 37 37 } 38 39 @Override 40 public int hashCode() { 41 int hash = 7; 42 hash = 59 * hash + (this.url != null ? this.url.hashCode() : 0); 43 return hash; 44 } 45 46 @Override 47 public boolean equals(Object obj) { 48 if (obj == null) { 49 return false; 50 } 51 if (getClass() != obj.getClass()) { 52 return false; 53 } 54 final Endpoint other = (Endpoint) obj; 55 if ((this.url == null) ? (other.url != null) : !this.url.equals(other.url)) { 56 return false; 57 } 58 return true; 59 } 60 61 62 63 38 64 } -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/Institution.java
r3059 r5034 2 2 3 3 import java.util.*; 4 import java.util.logging.Logger;5 4 6 5 /** … … 23 22 @Override 24 23 public Endpoint add(String endpointUrl) { 25 Endpoint ep = new Endpoint(endpointUrl, this); 26 endpoints.add(ep); 24 Endpoint ep = getEndpoint(endpointUrl); 25 if (ep == null) { 26 ep = new Endpoint(endpointUrl, this); 27 endpoints.add(ep); 28 } 27 29 return ep; 28 30 } … … 53 55 54 56 @Override 57 public Endpoint getEndpoint(String endpointUrl) { 58 for (Endpoint ep : endpoints) { 59 if (ep.getUrl().equals(endpointUrl)) { 60 return ep; 61 } 62 } 63 return null; 64 } 65 66 @Override 55 67 public String toString() { 56 68 if (name != null && name.length() > 0) { … … 60 72 } 61 73 } 74 75 62 76 } -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/InstitutionI.java
r3059 r5034 20 20 21 21 public Endpoint getEndpoint(int index); 22 23 public Endpoint getEndpoint(String endpointUrl); 22 24 } -
SRUAggregator/trunk/src/main/webapp/WEB-INF/web.xml
r2502 r5034 40 40 <url-pattern>/zkau/*</url-pattern> 41 41 </servlet-mapping> 42 43 <servlet> 44 <servlet-name>ServletAdaptor</servlet-name> 45 <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class> 46 <init-param> 47 <param-name>javax.ws.rs.Application</param-name> 48 <param-value>eu.clarin.sru.fcs.aggregator.rest.AggregatorService</param-value> 49 </init-param> 50 <load-on-startup>1</load-on-startup> 51 </servlet> 52 <servlet-mapping> 53 <servlet-name>ServletAdaptor</servlet-name> 54 <url-pattern>/service/*</url-pattern> 55 </servlet-mapping> 56 42 57 <welcome-file-list> 43 58 <welcome-file>index.zul</welcome-file>
Note: See TracChangeset
for help on using the changeset viewer.