source: SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java @ 6093

Last change on this file since 6093 was 6093, checked in by emanuel.dima@uni-tuebingen.de, 9 years ago
  1. beta-28: disabling sampa for now
File size: 11.8 KB
Line 
1package eu.clarin.sru.fcs.aggregator.app;
2
3import com.fasterxml.jackson.databind.ObjectMapper;
4import com.optimaize.langdetect.LanguageDetector;
5import com.optimaize.langdetect.LanguageDetectorBuilder;
6import com.optimaize.langdetect.ngram.NgramExtractors;
7import com.optimaize.langdetect.profiles.LanguageProfile;
8import com.optimaize.langdetect.profiles.LanguageProfileReader;
9import com.optimaize.langdetect.text.*;
10import eu.clarin.sru.fcs.aggregator.search.Search;
11import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask;
12import eu.clarin.sru.fcs.aggregator.scan.Corpora;
13import eu.clarin.sru.client.SRUVersion;
14import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder;
15import eu.clarin.sru.client.fcs.ClarinFCSEndpointDescriptionParser;
16import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
17import eu.clarin.sru.fcs.aggregator.scan.Corpus;
18import eu.clarin.sru.fcs.aggregator.rest.RestService;
19import eu.clarin.sru.fcs.aggregator.scan.Statistics;
20import eu.clarin.sru.fcs.aggregator.util.LanguagesISO693;
21import io.dropwizard.Application;
22import io.dropwizard.assets.AssetsBundle;
23import io.dropwizard.setup.Bootstrap;
24import io.dropwizard.setup.Environment;
25import java.io.File;
26import java.io.IOException;
27import java.util.ArrayList;
28import java.util.Collections;
29import java.util.HashMap;
30import java.util.List;
31import java.util.Map;
32import java.util.concurrent.ExecutorService;
33import java.util.concurrent.Executors;
34import java.util.concurrent.ScheduledExecutorService;
35import java.util.concurrent.atomic.AtomicReference;
36import org.slf4j.LoggerFactory;
37
38/**
39 * Main component of the Aggregator application intended to provide users access
40 * to CLARIN-FCS resources.
41 *
42 * The webapp base URL corresponds to the default behavior of displaying the
43 * main aggregator page, where the user can enter query, select the resources of
44 * CQL endpoints (as specified in the Clarin center registry), and search in
45 * these resources. The endpoints/resources selection is optional, by default
46 * all the endpoints root resources are selected.
47 *
48 * If invoked with 'x-aggregation-context' and 'query' parameter, the aggregator
49 * will pre-select provided resources and fill in the query field. This
50 * mechanism is currently used by VLO. Example: POST
51 * http://weblicht.sfs.uni-tuebingen.de/Aggregator HTTP/1.1 operation =
52 * searchRetrieve & version = 1.2 & query = bellen & x-aggregation-context =
53 * {"http://fedora.clarin-d.uni-saarland.de/sru/":["hdl:11858/00-246C-0000-0008-5F2A-0"]}
54 *
55 *
56 * Additionally, if run with the a URL query string parameter 'mode', the
57 * special behavior of the aggregator is triggered:
58 *
59 * /?mode=testing corresponds to the mode where the CQL endpoints are taken not
60 * from Clarin center repository, but from a hard-coded endpoints list; this
61 * functionality is useful for testing the development instances of endpoints,
62 * before they are moved to production. Was done to meet the request from MPI.
63 *
64 * /?mode=search corresponds to the mode where the aggregator page is requested
65 * with the already known query and (optionally) resources to search in, and if
66 * the immediate search is desired. In this case the aggregator search results
67 * page is displayed and search results of the provided query start to fill it
68 * in immediately (i.e. users don't need to click 'search' in the aggregator
69 * page). Was done to meet the request from CLARIN ERIC (Martin Wynne contacted
70 * us).
71 *
72 * /?mode=live corresponds to the mode where the information about corpora are
73 * taken not from the scan cache (crawled in advance), but loaded live, starting
74 * from the request to center registry and then performing scan operation
75 * requests on each CQL endpoint listed there. It takes time to get the
76 * corresponding responses from the endpoints, therefore the Aggregator page
77 * loads very slow in this mode. But this mode is useful for testing of the
78 * newly added or changed corpora without waiting for the next crawl.
79 *
80 *
81 * Adds Application initialization and clean up: only one SRU threaded client is
82 * used in the application, it has to be shut down when the application stops.
83 * One Languages object instance is used within the application.
84 *
85 * @author Yana Panchenko
86 * @author edima
87 *
88 * TODO: add the modes described above (except live)
89 *
90 * TODO: update comments everywhere
91 *
92 * TODO: add some basic docs
93 *
94 * TODO: Export search results to personal workspace using oauth
95 *
96 * TODO: websockets
97 *
98 * TODO: show multiple hits on the same result in multiple rows, linked visually
99 *
100 * TODO: optimise page load
101 *
102 * TODO: test it in IE, other browsers
103 *
104 * TODO: tri-state for parent collections; search + message implications
105 *
106 * TODO: improve help page text
107 *
108 */
109public class Aggregator extends Application<AggregatorConfiguration> {
110
111        private static final org.slf4j.Logger log = LoggerFactory.getLogger(Aggregator.class);
112
113        final int SEARCHES_SIZE_GC_THRESHOLD = 1000;
114        final int SEARCHES_AGE_GC_THRESHOLD = 60;
115
116        private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
117        private static Aggregator instance;
118        private AggregatorConfiguration.Params params;
119
120        private AtomicReference<Corpora> scanCacheAtom = new AtomicReference<Corpora>(new Corpora());
121        private AtomicReference<Statistics> scanStatsAtom = new AtomicReference<Statistics>(new Statistics());
122        private AtomicReference<Statistics> searchStatsAtom = new AtomicReference<Statistics>(new Statistics());
123
124        private LanguageDetector languageDetector;
125        private TextObjectFactory textObjectFactory;
126
127        private ThrottledClient sruScanClient = null;
128        private ThrottledClient sruSearchClient = null;
129        private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>());
130
131        public static void main(String[] args) throws Exception {
132                new Aggregator().run(args);
133        }
134
135        @Override
136        public String getName() {
137                return "FCS Aggregator";
138        }
139
140        @Override
141        public void initialize(Bootstrap<AggregatorConfiguration> bootstrap) {
142                bootstrap.addBundle(new AssetsBundle("/assets", "/", "index.html", "static"));
143        }
144
145        @Override
146        public void run(AggregatorConfiguration config, Environment environment) throws Exception {
147                params = config.aggregatorParams;
148                instance = this;
149
150                List<String> wll = new ArrayList<String>();
151                for (String l : config.aggregatorParams.weblichtConfig.getAcceptedTcfLanguages()) {
152                        wll.add(LanguagesISO693.getInstance().code_3ForCode(l));
153                }
154                config.aggregatorParams.weblichtConfig.acceptedTcfLanguages = wll;
155
156                System.out.println("Using parameters: ");
157                try {
158                        System.out.println(new ObjectMapper().writerWithDefaultPrettyPrinter().
159                                        writeValueAsString(config.aggregatorParams));
160                } catch (IOException xc) {
161                }
162
163                environment.getApplicationContext().setErrorHandler(new ErrorPageHandler());
164                environment.jersey().setUrlPattern("/rest/*");
165                environment.jersey().register(new RestService());
166
167                try {
168                        init();
169                } catch (Exception ex) {
170                        log.error("INIT EXCEPTION", ex);
171                        throw ex; // force exit
172                }
173        }
174
175        public static Aggregator getInstance() {
176                return instance;
177        }
178
179        public AggregatorConfiguration.Params getParams() {
180                return params;
181        }
182
183        public Corpora getCorpora() {
184                return scanCacheAtom.get();
185        }
186
187        public Statistics getScanStatistics() {
188                return scanStatsAtom.get();
189        }
190
191        public Statistics getSearchStatistics() {
192                return searchStatsAtom.get();
193        }
194
195        public void init() throws IOException {
196                log.info("Aggregator initialization started.");
197                sruScanClient = new ThrottledClient(
198                                new ClarinFCSClientBuilder()
199                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
200                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
201                                .addDefaultDataViewParsers()
202                                .registerExtraResponseDataParser(
203                                                new ClarinFCSEndpointDescriptionParser())
204                                .enableLegacySupport()
205                                .buildThreadedClient(),
206                                params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
207                );
208                sruSearchClient = new ThrottledClient(
209                                new ClarinFCSClientBuilder()
210                                .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
211                                .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
212                                .addDefaultDataViewParsers()
213                                .enableLegacySupport()
214                                .buildThreadedClient(),
215                                params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
216                );
217
218                File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH);
219                File corporaOldCacheFile = new File(params.AGGREGATOR_FILE_PATH_BACKUP);
220
221                // init corpora from file
222                {
223                        Corpora corpora = null;
224                        try {
225                                corpora = new ObjectMapper().readValue(corporaCacheFile, Corpora.class);
226                        } catch (Exception xc) {
227                                log.error("Failed to load cached corpora from primary file:", xc);
228                        }
229                        if (corpora == null) {
230                                try {
231                                        corpora = new ObjectMapper().readValue(corporaOldCacheFile, Corpora.class);
232                                } catch (Exception e) {
233                                        log.error("Failed to load cached corpora from backup file:", e);
234                                }
235                        }
236                        if (corpora != null) {
237                                scanCacheAtom.set(corpora);
238                                log.info("corpus list read from file; number of root corpora: " + scanCacheAtom.get().getCorpora().size());
239                        }
240                }
241
242                LanguagesISO693.getInstance(); // force init
243                initLanguageDetector();
244
245                ScanCrawlTask task = new ScanCrawlTask(sruScanClient,
246                                params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH,
247                                params.additionalCQLEndpoints,
248                                null, scanCacheAtom, corporaCacheFile, corporaOldCacheFile,
249                                scanStatsAtom, searchStatsAtom);
250                scheduler.scheduleAtFixedRate(task, params.SCAN_TASK_INITIAL_DELAY,
251                                params.SCAN_TASK_INTERVAL, params.getScanTaskTimeUnit());
252
253                log.info("Aggregator initialization finished.");
254        }
255
256        public void shutdown(AggregatorConfiguration config) {
257                log.info("Aggregator is shutting down.");
258                for (Search search : activeSearches.values()) {
259                        search.shutdown();
260                }
261                shutdownAndAwaitTermination(config.aggregatorParams, sruScanClient, scheduler);
262                shutdownAndAwaitTermination(config.aggregatorParams, sruSearchClient, scheduler);
263                log.info("Aggregator shutdown complete.");
264        }
265
266        // this function should be thread-safe
267        public Search startSearch(SRUVersion version, List<Corpus> corpora,
268                        String searchString, String searchLang,
269                        int firstRecord, int maxRecords) throws Exception {
270                if (corpora.isEmpty()) {
271                        // No corpora
272                        return null;
273                } else if (searchString.isEmpty()) {
274                        // No query
275                        return null;
276                } else {
277                        Search sr = new Search(sruSearchClient, version, searchStatsAtom.get(),
278                                        corpora, searchString, searchLang, maxRecords);
279                        if (activeSearches.size() > SEARCHES_SIZE_GC_THRESHOLD) {
280                                List<Long> toBeRemoved = new ArrayList<Long>();
281                                long t0 = System.currentTimeMillis();
282                                for (Map.Entry<Long, Search> e : activeSearches.entrySet()) {
283                                        long dtmin = (t0 - e.getValue().getCreatedAt()) / 1000 / 60;
284                                        if (dtmin > SEARCHES_AGE_GC_THRESHOLD) {
285                                                log.info("removing search " + e.getKey() + ": " + dtmin + " minutes old");
286                                                toBeRemoved.add(e.getKey());
287                                        }
288                                }
289                                for (Long l : toBeRemoved) {
290                                        activeSearches.remove(l);
291                                }
292                        }
293                        activeSearches.put(sr.getId(), sr);
294                        return sr;
295                }
296        }
297
298        public Search getSearchById(Long id) {
299                return activeSearches.get(id);
300        }
301
302        private static void shutdownAndAwaitTermination(AggregatorConfiguration.Params params,
303                        ThrottledClient sruClient, ExecutorService scheduler) {
304                try {
305                        sruClient.shutdown();
306                        scheduler.shutdown();
307                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
308                        sruClient.shutdownNow();
309                        scheduler.shutdownNow();
310                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
311                } catch (InterruptedException ie) {
312                        sruClient.shutdownNow();
313                        scheduler.shutdownNow();
314                        Thread.currentThread().interrupt();
315                }
316        }
317
318        public void initLanguageDetector() throws IOException {
319                List<LanguageProfile> languageProfiles = new LanguageProfileReader().readAll();
320                languageDetector = LanguageDetectorBuilder
321                                .create(NgramExtractors.standard())
322                                .withProfiles(languageProfiles)
323                                .build();
324
325                textObjectFactory = CommonTextObjectFactories.forDetectingOnLargeText();
326        }
327
328        public String detectLanguage(String text) {
329                return languageDetector.detect(textObjectFactory.forText(text)).orNull();
330        }
331}
Note: See TracBrowser for help on using the repository browser.