source: SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java @ 5931

Last change on this file since 5931 was 5931, checked in by emanuel.dima@uni-tuebingen.de, 9 years ago
  1. alpha12: better language filtering using detection, collections view improvements
File size: 11.4 KB
Line 
1package eu.clarin.sru.fcs.aggregator.app;
2
3import com.fasterxml.jackson.databind.ObjectMapper;
4import com.optimaize.langdetect.LanguageDetector;
5import com.optimaize.langdetect.LanguageDetectorBuilder;
6import com.optimaize.langdetect.ngram.NgramExtractors;
7import com.optimaize.langdetect.profiles.LanguageProfile;
8import com.optimaize.langdetect.profiles.LanguageProfileReader;
9import com.optimaize.langdetect.text.*;
10import eu.clarin.sru.fcs.aggregator.search.Search;
11import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask;
12import eu.clarin.sru.fcs.aggregator.scan.Corpora;
13import eu.clarin.sru.client.SRUVersion;
14import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder;
15import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
16import eu.clarin.sru.fcs.aggregator.scan.Corpus;
17import eu.clarin.sru.fcs.aggregator.rest.RestService;
18import eu.clarin.sru.fcs.aggregator.scan.Statistics;
19import eu.clarin.sru.fcs.aggregator.lang.LanguagesISO693_3;
20import io.dropwizard.Application;
21import io.dropwizard.assets.AssetsBundle;
22import io.dropwizard.setup.Bootstrap;
23import io.dropwizard.setup.Environment;
24import java.io.File;
25import java.io.IOException;
26import java.io.InputStream;
27import java.util.Collections;
28import java.util.HashMap;
29import java.util.List;
30import java.util.Map;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.ScheduledExecutorService;
34import java.util.concurrent.atomic.AtomicReference;
35import opennlp.tools.tokenize.TokenizerModel;
36import org.slf4j.LoggerFactory;
37
38/**
39 * Main component of the Aggregator application intended to provide users access
40 * to CLARIN-FCS resources.
41 *
42 * The webapp base URL corresponds to the default behavior of displaying the
43 * main aggregator page, where the user can enter query, select the resources of
44 * CQL endpoints (as specified in the Clarin center registry), and search in
45 * these resources. The endpoints/resources selection is optional, by default
46 * all the endpoints root resources are selected.
47 *
48 * If invoked with 'x-aggregation-context' and 'query' parameter, the aggregator
49 * will pre-select provided resources and fill in the query field. This
50 * mechanism is currently used by VLO. Example: POST
51 * http://weblicht.sfs.uni-tuebingen.de/Aggregator HTTP/1.1 operation =
52 * searchRetrieve & version = 1.2 & query = bellen & x-aggregation-context =
53 * {"http://fedora.clarin-d.uni-saarland.de/sru/":["hdl:11858/00-246C-0000-0008-5F2A-0"]}
54 *
55 *
56 * Additionally, if run with the a URL query string parameter 'mode', the
57 * special behavior of the aggregator is triggered:
58 *
59 * /?mode=testing corresponds to the mode where the CQL endpoints are taken not
60 * from Clarin center repository, but from a hard-coded endpoints list; this
61 * functionality is useful for testing the development instances of endpoints,
62 * before they are moved to production. Was done to meet the request from MPI.
63 *
64 * /?mode=search corresponds to the mode where the aggregator page is requested
65 * with the already known query and (optionally) resources to search in, and if
66 * the immediate search is desired. In this case the aggregator search results
67 * page is displayed and search results of the provided query start to fill it
68 * in immediately (i.e. users don't need to click 'search' in the aggregator
69 * page). Was done to meet the request from CLARIN ERIC (Martin Wynne contacted
70 * us).
71 *
72 * /?mode=live corresponds to the mode where the information about corpora are
73 * taken not from the scan cache (crawled in advance), but loaded live, starting
74 * from the request to center registry and then performing scan operation
75 * requests on each CQL endpoint listed there. It takes time to get the
76 * corresponding responses from the endpoints, therefore the Aggregator page
77 * loads very slow in this mode. But this mode is useful for testing of the
78 * newly added or changed corpora without waiting for the next crawl.
79 *
80 *
81 * Adds Application initialization and clean up: only one SRU threaded client is
82 * used in the application, it has to be shut down when the application stops.
83 * One Languages object instance is used within the application.
84 *
85 * @author Yana Panchenko
86 * @author edima
87 *
88 * TODO: fix ordering of corpora in corpora view
89 *
90 * TODO: corpora search should not indicate the ones that don't match
91 *
92 * TODO: tri-state for parent collections; search + message implications
93 *
94 * TODO: version page: credits, open source, see vcr/version page
95 *
96 * TODO: statistics page liked from version page
97 *
98 * TODO: 1. support new spec-compatible centres, see Oliver's mail
99 * (use SRUClient's extraResponseData POJOs)
100 *
101 * TODO: 2. zoom into the results from a corpus, allow functionality only for
102 * the view (search for next set of results)
103 *
104 * TODO: disable popups easily
105 *
106 * TODO: Fix activeSearch memory leak (gc searches older than...)
107 *
108 * TODO: Use weblicht with results
109 *
110 * TODO: Export to personal workspace as csv, excel, tcf, plain text
111 *
112 * TODO: Download to personal workspace as csv, excel, tcf, plain text
113 *
114 * TODO: websockets
115 *
116 * TODO: atomic replace of cached corpora (file)
117 *
118 * TODO: show multiple hits on the same result in multiple rows, linked visually
119 *
120 * TODO: optimise page load
121 *
122 * TODO: improve help page text
123 *
124 */
125public class Aggregator extends Application<AggregatorConfiguration> {
126
127        private static final org.slf4j.Logger log = LoggerFactory.getLogger(Aggregator.class);
128
129        public static final String DE_TOK_MODEL = "tokenizer/de-tuebadz-8.0-token.bin";
130
131        private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
132        private static Aggregator instance;
133        private AggregatorConfiguration.Params params;
134
135        private AtomicReference<Corpora> scanCacheAtom = new AtomicReference<Corpora>(new Corpora());
136        private AtomicReference<Statistics> scanStatsAtom = new AtomicReference<Statistics>(new Statistics());
137        private AtomicReference<Statistics> searchStatsAtom = new AtomicReference<Statistics>(new Statistics());
138
139        private TokenizerModel tokenizerModel;
140        private LanguageDetector languageDetector;
141        private TextObjectFactory textObjectFactory;
142
143        private ThrottledClient sruScanClient = null;
144        private ThrottledClient sruSearchClient = null;
145        private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>());
146
147        public static void main(String[] args) throws Exception {
148                new Aggregator().run(args);
149        }
150
151        @Override
152        public String getName() {
153                return "FCS Aggregator";
154        }
155
156        @Override
157        public void initialize(Bootstrap<AggregatorConfiguration> bootstrap) {
158                bootstrap.addBundle(new AssetsBundle("/assets", "/", "index.html"));
159        }
160
161        @Override
162        public void run(AggregatorConfiguration config, Environment environment) throws Exception {
163                params = config.aggregatorParams;
164                instance = this;
165
166                System.out.println("Using parameters: ");
167                try {
168                        System.out.println(new ObjectMapper().writerWithDefaultPrettyPrinter().
169                                        writeValueAsString(config.aggregatorParams));
170                } catch (IOException xc) {
171                }
172
173                environment.jersey().setUrlPattern("/rest/*");
174                environment.jersey().register(new RestService());
175
176                try {
177                        init();
178                } catch (Exception ex) {
179                        log.error("INIT EXCEPTION", ex);
180                        throw ex; // force exit
181                }
182        }
183
184        public static Aggregator getInstance() {
185                return instance;
186        }
187
188        public AggregatorConfiguration.Params getParams() {
189                return params;
190        }
191
192        public Corpora getCorpora() {
193                return scanCacheAtom.get();
194        }
195
196        public Statistics getScanStatistics() {
197                return scanStatsAtom.get();
198        }
199
200        public Statistics getSearchStatistics() {
201                return searchStatsAtom.get();
202        }
203
204        public void init() throws IOException {
205                log.info("Aggregator initialization started.");
206                sruScanClient = new ThrottledClient(
207                                new ClarinFCSClientBuilder()
208                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
209                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
210                                .addDefaultDataViewParsers()
211                                .enableLegacySupport()
212                                .buildThreadedClient(),
213                                params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT,
214                                params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
215                );
216                sruSearchClient = new ThrottledClient(
217                                new ClarinFCSClientBuilder()
218                                .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
219                                .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
220                                .addDefaultDataViewParsers()
221                                .enableLegacySupport()
222                                .buildThreadedClient(),
223                                params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT,
224                                params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
225                );
226
227                File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH);
228                try {
229                        Corpora corpora = new ObjectMapper().readValue(corporaCacheFile, Corpora.class);
230                        scanCacheAtom.set(corpora);
231                        log.info("corpus list read from file; number of root corpora: " + scanCacheAtom.get().getCorpora().size());
232                } catch (Exception e) {
233                        log.error("Error while reading cached corpora:", e);
234                }
235
236                LanguagesISO693_3.getInstance(); // force init
237                initTokenizer();
238                initLanguageDetector();
239
240                ScanCrawlTask task = new ScanCrawlTask(sruScanClient,
241                                params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH,
242                                null, scanCacheAtom, corporaCacheFile, scanStatsAtom);
243                scheduler.scheduleAtFixedRate(task, params.SCAN_TASK_INITIAL_DELAY,
244                                params.SCAN_TASK_INTERVAL, params.getScanTaskTimeUnit());
245
246                log.info("Aggregator initialization finished.");
247        }
248
249        public void shutdown(AggregatorConfiguration config) {
250                log.info("Aggregator is shutting down.");
251                for (Search search : activeSearches.values()) {
252                        search.shutdown();
253                }
254                shutdownAndAwaitTermination(config.aggregatorParams, sruScanClient, scheduler);
255                shutdownAndAwaitTermination(config.aggregatorParams, sruSearchClient, scheduler);
256                log.info("Aggregator shutdown complete.");
257        }
258
259        // this function should be thread-safe
260        public Search startSearch(SRUVersion version, List<Corpus> corpora,
261                        String searchString, String searchLang, int maxRecords) throws Exception {
262                if (corpora.isEmpty()) {
263                        // No corpora
264                        return null;
265                } else if (searchString.isEmpty()) {
266                        // No query
267                        return null;
268                } else {
269                        Search sr = new Search(sruSearchClient, version, searchStatsAtom.get(),
270                                        corpora, searchString, searchLang, 1, maxRecords);
271                        activeSearches.put(sr.getId(), sr);
272                        return sr;
273                }
274        }
275
276        public Search getSearchById(Long id) {
277                return activeSearches.get(id);
278        }
279
280        private static void shutdownAndAwaitTermination(AggregatorConfiguration.Params params,
281                        ThrottledClient sruClient, ExecutorService scheduler) {
282                try {
283                        sruClient.shutdown();
284                        scheduler.shutdown();
285                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
286                        sruClient.shutdownNow();
287                        scheduler.shutdownNow();
288                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
289                } catch (InterruptedException ie) {
290                        sruClient.shutdownNow();
291                        scheduler.shutdownNow();
292                        Thread.currentThread().interrupt();
293                }
294        }
295
296        private void initTokenizer() {
297                TokenizerModel model = null;
298                try {
299                        try (InputStream tokenizerModelDeAsIS = Thread.currentThread().getContextClassLoader().getResourceAsStream(DE_TOK_MODEL)) {
300                                model = new TokenizerModel(tokenizerModelDeAsIS);
301                        }
302                } catch (IOException ex) {
303                        log.error("Failed to load tokenizer model", ex);
304                }
305                tokenizerModel = model;
306        }
307
308        public void initLanguageDetector() throws IOException {
309                List<LanguageProfile> languageProfiles = new LanguageProfileReader().readAll();
310                languageDetector = LanguageDetectorBuilder
311                                .create(NgramExtractors.standard())
312                                .withProfiles(languageProfiles)
313                                .build();
314
315                textObjectFactory = CommonTextObjectFactories.forDetectingOnLargeText();
316        }
317
318        public String detectLanguage(String text) {
319                return languageDetector.detect(textObjectFactory.forText(text)).orNull();
320        }
321}
Note: See TracBrowser for help on using the repository browser.