source: SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java @ 5897

Last change on this file since 5897 was 5897, checked in by emanuel.dima@uni-tuebingen.de, 9 years ago

fixed language codes translations (now using ISO693-3) + misc

File size: 10.3 KB
Line 
1package eu.clarin.sru.fcs.aggregator.app;
2
3import com.fasterxml.jackson.databind.ObjectMapper;
4import eu.clarin.sru.fcs.aggregator.search.Search;
5import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask;
6import eu.clarin.sru.fcs.aggregator.scan.Corpora;
7import eu.clarin.sru.client.SRUVersion;
8import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder;
9import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
10import eu.clarin.sru.fcs.aggregator.scan.Corpus;
11import eu.clarin.sru.fcs.aggregator.rest.RestService;
12import eu.clarin.sru.fcs.aggregator.scan.Statistics;
13import eu.clarin.sru.fcs.aggregator.lang.LanguagesISO693_3;
14import io.dropwizard.Application;
15import io.dropwizard.assets.AssetsBundle;
16import io.dropwizard.setup.Bootstrap;
17import io.dropwizard.setup.Environment;
18import java.io.File;
19import java.io.IOException;
20import java.io.InputStream;
21import java.util.Collections;
22import java.util.HashMap;
23import java.util.List;
24import java.util.Map;
25import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
27import java.util.concurrent.ScheduledExecutorService;
28import java.util.concurrent.atomic.AtomicReference;
29import opennlp.tools.tokenize.TokenizerModel;
30import org.slf4j.LoggerFactory;
31
32/**
33 * Main component of the Aggregator application intended to provide users access
34 * to CLARIN-FCS resources.
35 *
36 * The webapp base URL corresponds to the default behavior of displaying the
37 * main aggregator page, where the user can enter query, select the resources of
38 * CQL endpoints (as specified in the Clarin center registry), and search in
39 * these resources. The endpoints/resources selection is optional, by default
40 * all the endpoints root resources are selected.
41 *
42 * If invoked with 'x-aggregation-context' and 'query' parameter, the aggregator
43 * will pre-select provided resources and fill in the query field. This
44 * mechanism is currently used by VLO. Example: POST
45 * http://weblicht.sfs.uni-tuebingen.de/Aggregator HTTP/1.1 operation =
46 * searchRetrieve & version = 1.2 & query = bellen & x-aggregation-context =
47 * {"http://fedora.clarin-d.uni-saarland.de/sru/":["hdl:11858/00-246C-0000-0008-5F2A-0"]}
48 *
49 *
50 * Additionally, if run with the a URL query string parameter 'mode', the
51 * special behavior of the aggregator is triggered:
52 *
53 * /?mode=testing corresponds to the mode where the CQL endpoints are taken not
54 * from Clarin center repository, but from a hard-coded endpoints list; this
55 * functionality is useful for testing the development instances of endpoints,
56 * before they are moved to production. Was done to meet the request from MPI.
57 *
58 * /?mode=search corresponds to the mode where the aggregator page is requested
59 * with the already known query and (optionally) resources to search in, and if
60 * the immediate search is desired. In this case the aggregator search results
61 * page is displayed and search results of the provided query start to fill it
62 * in immediately (i.e. users don't need to click 'search' in the aggregator
63 * page). Was done to meet the request from CLARIN ERIC (Martin Wynne contacted
64 * us).
65 *
66 * /?mode=live corresponds to the mode where the information about corpora are
67 * taken not from the scan cache (crawled in advance), but loaded live, starting
68 * from the request to center registry and then performing scan operation
69 * requests on each CQL endpoint listed there. It takes time to get the
70 * corresponding responses from the endpoints, therefore the Aggregator page
71 * loads very slow in this mode. But this mode is useful for testing of the
72 * newly added or changed corpora without waiting for the next crawl.
73 *
74 *
75 * Adds Application initialization and clean up: only one SRU threaded client is
76 * used in the application, it has to be shut down when the application stops.
77 * One Languages object instance is used within the application.
78 *
79 * @author Yana Panchenko
80 * @author edima
81 *
82 * TODO: make language show nicely in the UI
83 *
84 * TODO: use selected visible corpus for search
85 *
86 * TODO: use language selection to hide corpora
87 *
88 * TODO: support new spec-compatible centres, see Oliver's mail ...............
89 *
90 * TODO: disable popups easily
91 *
92 * TODO: zoom into the results from a corpus, allow functionality only for the
93 * view (search for next set of results)
94 *
95 * TODO: Fix activeSearch memory leak (gc searches older than...)
96 *
97 * TODO: Use weblicht with results
98 *
99 * TODO: Export to personal workspace as csv, excel, tcf, plain text
100 *
101 * TODO: Download to personal workspace as csv, excel, tcf, plain text
102 *
103 * TODO: use SRUClient's extraResponseData POJOs
104 *
105 * TODO: websockets
106 *
107 * TODO: atomic replace of cached corpora (file)
108 *
109 * TODO: show multiple hits on the same result in multiple rows, linked visually
110 *
111 */
112public class Aggregator extends Application<AggregatorConfiguration> {
113
114        private static final org.slf4j.Logger log = LoggerFactory.getLogger(Aggregator.class);
115
116        public static final String DE_TOK_MODEL = "tokenizer/de-tuebadz-8.0-token.bin";
117
118        private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
119        private static Aggregator instance;
120        private AggregatorConfiguration.Params params;
121
122        private AtomicReference<Corpora> scanCacheAtom = new AtomicReference<Corpora>(new Corpora());
123        private AtomicReference<Statistics> scanStatsAtom = new AtomicReference<Statistics>(new Statistics());
124        private AtomicReference<Statistics> searchStatsAtom = new AtomicReference<Statistics>(new Statistics());
125
126        private TokenizerModel model;
127        private ThrottledClient sruScanClient = null;
128        private ThrottledClient sruSearchClient = null;
129        private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>());
130
131        public static void main(String[] args) throws Exception {
132                new Aggregator().run(args);
133        }
134
135        @Override
136        public String getName() {
137                return "FCS Aggregator";
138        }
139
140        @Override
141        public void initialize(Bootstrap<AggregatorConfiguration> bootstrap) {
142                bootstrap.addBundle(new AssetsBundle("/assets", "/", "index.html"));
143        }
144
145        @Override
146        public void run(AggregatorConfiguration config, Environment environment) {
147                params = config.aggregatorParams;
148                instance = this;
149
150                System.out.println("Using parameters: ");
151                try {
152                        System.out.println(new ObjectMapper().writerWithDefaultPrettyPrinter().
153                                        writeValueAsString(config.aggregatorParams));
154                } catch (IOException xc) {
155                }
156
157                environment.jersey().setUrlPattern("/rest/*");
158                environment.jersey().register(new RestService());
159
160                try {
161                        init();
162                } catch (Exception ex) {
163                        log.error("INIT EXCEPTION", ex);
164                        throw ex; // force exit
165                }
166        }
167
168        public static Aggregator getInstance() {
169                return instance;
170        }
171
172        public AggregatorConfiguration.Params getParams() {
173                return params;
174        }
175
176        public Corpora getCorpora() {
177                return scanCacheAtom.get();
178        }
179
180        public Statistics getScanStatistics() {
181                return scanStatsAtom.get();
182        }
183
184        public Statistics getSearchStatistics() {
185                return searchStatsAtom.get();
186        }
187
188        public void init() {
189                log.info("Aggregator initialization started.");
190                sruScanClient = new ThrottledClient(
191                                new ClarinFCSClientBuilder()
192                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
193                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
194                                .addDefaultDataViewParsers()
195                                .enableLegacySupport()
196                                .buildThreadedClient());
197                sruSearchClient = new ThrottledClient(
198                                new ClarinFCSClientBuilder()
199                                .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
200                                .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
201                                .addDefaultDataViewParsers()
202                                .enableLegacySupport()
203                                .buildThreadedClient());
204
205                File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH);
206                try {
207                        Corpora corpora = new ObjectMapper().readValue(corporaCacheFile, Corpora.class);
208                        scanCacheAtom.set(corpora);
209                        log.info("corpus list read from file; number of root corpora: " + scanCacheAtom.get().getCorpora().size());
210                } catch (Exception e) {
211                        log.error("Error while reading cached corpora:", e);
212                }
213
214                LanguagesISO693_3.getInstance(); // force init
215                model = setUpTokenizers();
216
217                ScanCrawlTask task = new ScanCrawlTask(sruScanClient,
218                                params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH,
219                                null, scanCacheAtom, corporaCacheFile, scanStatsAtom);
220                scheduler.scheduleAtFixedRate(task, params.SCAN_TASK_INITIAL_DELAY,
221                                params.SCAN_TASK_INTERVAL, params.getScanTaskTimeUnit());
222
223                log.info("Aggregator initialization finished.");
224        }
225
226        public void shutdown(AggregatorConfiguration config) {
227                log.info("Aggregator is shutting down.");
228                for (Search search : activeSearches.values()) {
229                        search.shutdown();
230                }
231                shutdownAndAwaitTermination(config.aggregatorParams, sruScanClient, scheduler);
232                shutdownAndAwaitTermination(config.aggregatorParams, sruSearchClient, scheduler);
233                log.info("Aggregator shutdown complete.");
234        }
235
236        // this function should be thread-safe
237        public Search startSearch(SRUVersion version, List<Corpus> corpora, String searchString, String searchLang, int maxRecords) throws Exception {
238                if (corpora.isEmpty()) {
239                        // No corpora
240                        return null;
241                } else if (searchString.isEmpty()) {
242                        // No query
243                        return null;
244                } else {
245                        Search sr = new Search(sruSearchClient, version, searchStatsAtom.get(),
246                                        corpora, searchString, searchLang, 1, maxRecords);
247                        activeSearches.put(sr.getId(), sr);
248                        return sr;
249                }
250        }
251
252        public Search getSearchById(Long id) {
253                return activeSearches.get(id);
254        }
255
256        private static void shutdownAndAwaitTermination(AggregatorConfiguration.Params params,
257                        ThrottledClient sruClient, ExecutorService scheduler) {
258                try {
259                        sruClient.shutdown();
260                        scheduler.shutdown();
261                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
262                        sruClient.shutdownNow();
263                        scheduler.shutdownNow();
264                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
265                } catch (InterruptedException ie) {
266                        sruClient.shutdownNow();
267                        scheduler.shutdownNow();
268                        Thread.currentThread().interrupt();
269                }
270        }
271
272        private static TokenizerModel setUpTokenizers() {
273                TokenizerModel model = null;
274                try {
275                        try (InputStream tokenizerModelDeAsIS = Thread.currentThread().getContextClassLoader().getResourceAsStream(DE_TOK_MODEL)) {
276                                model = new TokenizerModel(tokenizerModelDeAsIS);
277                        }
278                } catch (IOException ex) {
279                        log.error("Failed to load tokenizer model", ex);
280                }
281                return model;
282        }
283
284//              filter = new EndpointUrlFilterAllow("lindat");
285//              filter = new EndpointUrlFilterDeny("leipzig");
286//              filter = new EndpointUrlFilterAllow("leipzig", "mpi.nl");
287//              filter = new EndpointUrlFilterAllow("lindat");
288}
Note: See TracBrowser for help on using the repository browser.