source: SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java @ 6049

Last change on this file since 6049 was 6049, checked in by emanuel.dima@uni-tuebingen.de, 9 years ago
  1. alpha 22: backup for corpora cache file + fix popup trigger
File size: 12.1 KB
Line 
1package eu.clarin.sru.fcs.aggregator.app;
2
3import com.fasterxml.jackson.databind.ObjectMapper;
4import com.optimaize.langdetect.LanguageDetector;
5import com.optimaize.langdetect.LanguageDetectorBuilder;
6import com.optimaize.langdetect.ngram.NgramExtractors;
7import com.optimaize.langdetect.profiles.LanguageProfile;
8import com.optimaize.langdetect.profiles.LanguageProfileReader;
9import com.optimaize.langdetect.text.*;
10import eu.clarin.sru.fcs.aggregator.search.Search;
11import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask;
12import eu.clarin.sru.fcs.aggregator.scan.Corpora;
13import eu.clarin.sru.client.SRUVersion;
14import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder;
15import eu.clarin.sru.client.fcs.ClarinFCSEndpointDescriptionParser;
16import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
17import eu.clarin.sru.fcs.aggregator.scan.Corpus;
18import eu.clarin.sru.fcs.aggregator.rest.RestService;
19import eu.clarin.sru.fcs.aggregator.scan.Statistics;
20import eu.clarin.sru.fcs.aggregator.lang.LanguagesISO693_3;
21import io.dropwizard.Application;
22import io.dropwizard.assets.AssetsBundle;
23import io.dropwizard.setup.Bootstrap;
24import io.dropwizard.setup.Environment;
25import java.io.File;
26import java.io.IOException;
27import java.io.InputStream;
28import java.util.ArrayList;
29import java.util.Collections;
30import java.util.HashMap;
31import java.util.List;
32import java.util.Map;
33import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
35import java.util.concurrent.ScheduledExecutorService;
36import java.util.concurrent.atomic.AtomicReference;
37import opennlp.tools.tokenize.TokenizerModel;
38import org.slf4j.LoggerFactory;
39
40/**
41 * Main component of the Aggregator application intended to provide users access
42 * to CLARIN-FCS resources.
43 *
44 * The webapp base URL corresponds to the default behavior of displaying the
45 * main aggregator page, where the user can enter query, select the resources of
46 * CQL endpoints (as specified in the Clarin center registry), and search in
47 * these resources. The endpoints/resources selection is optional, by default
48 * all the endpoints root resources are selected.
49 *
50 * If invoked with 'x-aggregation-context' and 'query' parameter, the aggregator
51 * will pre-select provided resources and fill in the query field. This
52 * mechanism is currently used by VLO. Example: POST
53 * http://weblicht.sfs.uni-tuebingen.de/Aggregator HTTP/1.1 operation =
54 * searchRetrieve & version = 1.2 & query = bellen & x-aggregation-context =
55 * {"http://fedora.clarin-d.uni-saarland.de/sru/":["hdl:11858/00-246C-0000-0008-5F2A-0"]}
56 *
57 *
58 * Additionally, if run with the a URL query string parameter 'mode', the
59 * special behavior of the aggregator is triggered:
60 *
61 * /?mode=testing corresponds to the mode where the CQL endpoints are taken not
62 * from Clarin center repository, but from a hard-coded endpoints list; this
63 * functionality is useful for testing the development instances of endpoints,
64 * before they are moved to production. Was done to meet the request from MPI.
65 *
66 * /?mode=search corresponds to the mode where the aggregator page is requested
67 * with the already known query and (optionally) resources to search in, and if
68 * the immediate search is desired. In this case the aggregator search results
69 * page is displayed and search results of the provided query start to fill it
70 * in immediately (i.e. users don't need to click 'search' in the aggregator
71 * page). Was done to meet the request from CLARIN ERIC (Martin Wynne contacted
72 * us).
73 *
74 * /?mode=live corresponds to the mode where the information about corpora are
75 * taken not from the scan cache (crawled in advance), but loaded live, starting
76 * from the request to center registry and then performing scan operation
77 * requests on each CQL endpoint listed there. It takes time to get the
78 * corresponding responses from the endpoints, therefore the Aggregator page
79 * loads very slow in this mode. But this mode is useful for testing of the
80 * newly added or changed corpora without waiting for the next crawl.
81 *
82 *
83 * Adds Application initialization and clean up: only one SRU threaded client is
84 * used in the application, it has to be shut down when the application stops.
85 * One Languages object instance is used within the application.
86 *
87 * @author Yana Panchenko
88 * @author edima
89 *
90 * TODO: zoom into the results from a corpus, allow functionality only for
91 * the view (search for next set of results)
92 *
93 * TODO: Use weblicht with results
94 *
95 * TODO: Export search results to personal workspace as csv, excel, tcf, plain
96 * text: ask Marie/Wei about oauth ways to do that ndg oauth; ask Menzo, Willem,
97 * Twan (they did a test, it worked)
98 *
99 * TODO: websockets
100 *
101 * TODO: show multiple hits on the same result in multiple rows, linked visually
102 *
103 * TODO: optimise page load
104 *
105 * TODO: test it in IE, other browsers
106 *
107 * TODO: tri-state for parent collections; search + message implications
108 *
109 * TODO: improve help page text
110 *
111 */
112public class Aggregator extends Application<AggregatorConfiguration> {
113
114        private static final org.slf4j.Logger log = LoggerFactory.getLogger(Aggregator.class);
115
116        public static final String DE_TOK_MODEL = "tokenizer/de-tuebadz-8.0-token.bin";
117
118        private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
119        private static Aggregator instance;
120        private AggregatorConfiguration.Params params;
121
122        private AtomicReference<Corpora> scanCacheAtom = new AtomicReference<Corpora>(new Corpora());
123        private AtomicReference<Statistics> scanStatsAtom = new AtomicReference<Statistics>(new Statistics());
124        private AtomicReference<Statistics> searchStatsAtom = new AtomicReference<Statistics>(new Statistics());
125
126        private TokenizerModel tokenizerModel;
127        private LanguageDetector languageDetector;
128        private TextObjectFactory textObjectFactory;
129
130        private ThrottledClient sruScanClient = null;
131        private ThrottledClient sruSearchClient = null;
132        private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>());
133
134        public static void main(String[] args) throws Exception {
135                new Aggregator().run(args);
136        }
137
138        @Override
139        public String getName() {
140                return "FCS Aggregator";
141        }
142
143        @Override
144        public void initialize(Bootstrap<AggregatorConfiguration> bootstrap) {
145                bootstrap.addBundle(new AssetsBundle("/assets", "/", "index.html", "static"));
146        }
147
148        @Override
149        public void run(AggregatorConfiguration config, Environment environment) throws Exception {
150                params = config.aggregatorParams;
151                instance = this;
152
153                System.out.println("Using parameters: ");
154                try {
155                        System.out.println(new ObjectMapper().writerWithDefaultPrettyPrinter().
156                                        writeValueAsString(config.aggregatorParams));
157                } catch (IOException xc) {
158                }
159
160                environment.getApplicationContext().setErrorHandler(new ErrorPageHandler());
161                environment.jersey().setUrlPattern("/rest/*");
162                environment.jersey().register(new RestService());
163
164                try {
165                        init();
166                } catch (Exception ex) {
167                        log.error("INIT EXCEPTION", ex);
168                        throw ex; // force exit
169                }
170        }
171
172        public static Aggregator getInstance() {
173                return instance;
174        }
175
176        public AggregatorConfiguration.Params getParams() {
177                return params;
178        }
179
180        public Corpora getCorpora() {
181                return scanCacheAtom.get();
182        }
183
184        public Statistics getScanStatistics() {
185                return scanStatsAtom.get();
186        }
187
188        public Statistics getSearchStatistics() {
189                return searchStatsAtom.get();
190        }
191
192        public void init() throws IOException {
193                log.info("Aggregator initialization started.");
194                sruScanClient = new ThrottledClient(
195                                new ClarinFCSClientBuilder()
196                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
197                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
198                                .addDefaultDataViewParsers()
199                                .registerExtraResponseDataParser(
200                                                new ClarinFCSEndpointDescriptionParser())
201                                .enableLegacySupport()
202                                .buildThreadedClient(),
203                                params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
204                );
205                sruSearchClient = new ThrottledClient(
206                                new ClarinFCSClientBuilder()
207                                .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
208                                .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
209                                .addDefaultDataViewParsers()
210                                .enableLegacySupport()
211                                .buildThreadedClient(),
212                                params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
213                );
214
215                File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH);
216                File corporaOldCacheFile = new File(params.AGGREGATOR_FILE_PATH_BACKUP);
217
218                // init corpora from file
219                {
220                        Corpora corpora = null;
221                        try {
222                                corpora = new ObjectMapper().readValue(corporaCacheFile, Corpora.class);
223                        } catch (Exception xc) {
224                                log.error("Failed to load cached corpora from primary file:", xc);
225                        }
226                        if (corpora == null) {
227                                try {
228                                        corpora = new ObjectMapper().readValue(corporaOldCacheFile, Corpora.class);
229                                } catch (Exception e) {
230                                        log.error("Failed to load cached corpora from backup file:", e);
231                                }
232                        }
233                        if (corpora != null) {
234                                scanCacheAtom.set(corpora);
235                                log.info("corpus list read from file; number of root corpora: " + scanCacheAtom.get().getCorpora().size());
236                        }
237                }
238
239                LanguagesISO693_3.getInstance(); // force init
240                initTokenizer();
241                initLanguageDetector();
242
243                ScanCrawlTask task = new ScanCrawlTask(sruScanClient,
244                                params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH,
245                                params.additionalCQLEndpoints,
246                                null, scanCacheAtom, corporaCacheFile, corporaOldCacheFile,
247                                scanStatsAtom, searchStatsAtom);
248                scheduler.scheduleAtFixedRate(task, params.SCAN_TASK_INITIAL_DELAY,
249                                params.SCAN_TASK_INTERVAL, params.getScanTaskTimeUnit());
250
251                log.info("Aggregator initialization finished.");
252        }
253
254        public void shutdown(AggregatorConfiguration config) {
255                log.info("Aggregator is shutting down.");
256                for (Search search : activeSearches.values()) {
257                        search.shutdown();
258                }
259                shutdownAndAwaitTermination(config.aggregatorParams, sruScanClient, scheduler);
260                shutdownAndAwaitTermination(config.aggregatorParams, sruSearchClient, scheduler);
261                log.info("Aggregator shutdown complete.");
262        }
263
264        // this function should be thread-safe
265        public Search startSearch(SRUVersion version, List<Corpus> corpora,
266                        String searchString, String searchLang, int maxRecords) throws Exception {
267                if (corpora.isEmpty()) {
268                        // No corpora
269                        return null;
270                } else if (searchString.isEmpty()) {
271                        // No query
272                        return null;
273                } else {
274                        Search sr = new Search(sruSearchClient, version, searchStatsAtom.get(),
275                                        corpora, searchString, searchLang, 1, maxRecords);
276                        if ((activeSearches.size() % 100) == 0) {
277                                List<Long> toBeRemoved = new ArrayList<Long>();
278                                long t0 = System.currentTimeMillis();
279                                for (Map.Entry<Long, Search> e : activeSearches.entrySet()) {
280                                        if (t0 - e.getValue().getCreatedAt() > 1800 * 1000) {
281                                                toBeRemoved.add(e.getKey());
282                                        }
283                                }
284                                for (Long l : toBeRemoved) {
285                                        activeSearches.remove(l);
286                                }
287                        }
288                        activeSearches.put(sr.getId(), sr);
289                        return sr;
290                }
291        }
292
293        public Search getSearchById(Long id) {
294                return activeSearches.get(id);
295        }
296
297        public TokenizerModel getTokenizerModel() {
298                return tokenizerModel;
299        }
300
301        private static void shutdownAndAwaitTermination(AggregatorConfiguration.Params params,
302                        ThrottledClient sruClient, ExecutorService scheduler) {
303                try {
304                        sruClient.shutdown();
305                        scheduler.shutdown();
306                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
307                        sruClient.shutdownNow();
308                        scheduler.shutdownNow();
309                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
310                } catch (InterruptedException ie) {
311                        sruClient.shutdownNow();
312                        scheduler.shutdownNow();
313                        Thread.currentThread().interrupt();
314                }
315        }
316
317        private void initTokenizer() {
318                TokenizerModel model = null;
319                try {
320                        try (InputStream tokenizerModelDeAsIS = Thread.currentThread().getContextClassLoader().getResourceAsStream(DE_TOK_MODEL)) {
321                                model = new TokenizerModel(tokenizerModelDeAsIS);
322                        }
323                } catch (IOException ex) {
324                        log.error("Failed to load tokenizer model", ex);
325                }
326                tokenizerModel = model;
327        }
328
329        public void initLanguageDetector() throws IOException {
330                List<LanguageProfile> languageProfiles = new LanguageProfileReader().readAll();
331                languageDetector = LanguageDetectorBuilder
332                                .create(NgramExtractors.standard())
333                                .withProfiles(languageProfiles)
334                                .build();
335
336                textObjectFactory = CommonTextObjectFactories.forDetectingOnLargeText();
337        }
338
339        public String detectLanguage(String text) {
340                return languageDetector.detect(textObjectFactory.forText(text)).orNull();
341        }
342}
Note: See TracBrowser for help on using the repository browser.