source: SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java @ 6123

Last change on this file since 6123 was 6123, checked in by emanuel.dima@uni-tuebingen.de, 9 years ago
  1. beta-31: support for query&mode params, and for x-aggregation-context on POST
File size: 10.5 KB
Line 
1package eu.clarin.sru.fcs.aggregator.app;
2
3import com.fasterxml.jackson.databind.ObjectMapper;
4import com.optimaize.langdetect.LanguageDetector;
5import com.optimaize.langdetect.LanguageDetectorBuilder;
6import com.optimaize.langdetect.ngram.NgramExtractors;
7import com.optimaize.langdetect.profiles.LanguageProfile;
8import com.optimaize.langdetect.profiles.LanguageProfileReader;
9import com.optimaize.langdetect.text.*;
10import eu.clarin.sru.fcs.aggregator.search.Search;
11import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask;
12import eu.clarin.sru.fcs.aggregator.scan.Corpora;
13import eu.clarin.sru.client.SRUVersion;
14import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder;
15import eu.clarin.sru.client.fcs.ClarinFCSEndpointDescriptionParser;
16import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
17import eu.clarin.sru.fcs.aggregator.scan.Corpus;
18import eu.clarin.sru.fcs.aggregator.rest.RestService;
19import eu.clarin.sru.fcs.aggregator.scan.Statistics;
20import eu.clarin.sru.fcs.aggregator.util.LanguagesISO693;
21import io.dropwizard.Application;
22import io.dropwizard.assets.AssetsBundle;
23import io.dropwizard.setup.Bootstrap;
24import io.dropwizard.setup.Environment;
25import java.io.File;
26import java.io.IOException;
27import java.util.ArrayList;
28import java.util.Collections;
29import java.util.HashMap;
30import java.util.List;
31import java.util.Map;
32import java.util.concurrent.ExecutorService;
33import java.util.concurrent.Executors;
34import java.util.concurrent.ScheduledExecutorService;
35import java.util.concurrent.atomic.AtomicReference;
36import org.eclipse.jetty.server.session.SessionHandler;
37import org.slf4j.LoggerFactory;
38
39/**
40 * Main component of the Aggregator application intended to provide users access
41 * to CLARIN-FCS resources.
42 *
43 * The webapp base URL corresponds to the default behavior of displaying the
44 * main aggregator page, where the user can enter query, select the resources of
45 * CQL endpoints (as specified in the Clarin center registry), and search in
46 * these resources. The endpoints/resources selection is optional, by default
47 * all the endpoints root resources are selected.
48 *
49 * If invoked with 'x-aggregation-context' and 'query' parameter, the aggregator
50 * will pre-select provided resources and fill in the query field. This
51 * mechanism is currently used by VLO.
52 *
53 * Example: POST
54 * http://weblicht.sfs.uni-tuebingen.de/Aggregator HTTP/1.1
55 * query = bellen & x-aggregation-context =
56 * {"http://fedora.clarin-d.uni-saarland.de/sru/":["hdl:11858/00-246C-0000-0008-5F2A-0"]}
57 *
58 * If the Aggregator web page has the URL query string parameter 'mode'
59 * set to the string 'search', and the 'query' parameter is set,
60 * then the aggregator search results for this query are immediately displayed
61 * (i.e. users don't need to click 'search' in the aggregator page).
62 * This feature has been requested initially by Martin Wynne from CLARIN ERIC
63 *
64 * @author Yana Panchenko
65 * @author edima
66 *
67 * TODO: improve help page text
68 *
69 * TODO: update comments everywhere
70 *
71 * TODO: add some basic docs
72 *
73 * TODO: Export search results to personal workspace using oauth
74 *
75 * TODO: websockets
76 *
77 * TODO: show multiple hits on the same result in multiple rows, linked visually
78 *
79 * TODO: optimise page load
80 *
81 * TODO: test it in IE, other browsers
82 *
83 * TODO: tri-state for parent collections; search + message implications
84 *
85 */
86public class Aggregator extends Application<AggregatorConfiguration> {
87
88        private static final org.slf4j.Logger log = LoggerFactory.getLogger(Aggregator.class);
89
90        final int SEARCHES_SIZE_GC_THRESHOLD = 1000;
91        final int SEARCHES_AGE_GC_THRESHOLD = 60;
92
93        private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
94        private static Aggregator instance;
95        private AggregatorConfiguration.Params params;
96
97        private AtomicReference<Corpora> scanCacheAtom = new AtomicReference<Corpora>(new Corpora());
98        private AtomicReference<Statistics> scanStatsAtom = new AtomicReference<Statistics>(new Statistics());
99        private AtomicReference<Statistics> searchStatsAtom = new AtomicReference<Statistics>(new Statistics());
100
101        private LanguageDetector languageDetector;
102        private TextObjectFactory textObjectFactory;
103
104        private ThrottledClient sruScanClient = null;
105        private ThrottledClient sruSearchClient = null;
106        private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>());
107
108        public static void main(String[] args) throws Exception {
109                new Aggregator().run(args);
110        }
111
112        @Override
113        public String getName() {
114                return "FCS Aggregator";
115        }
116
117        @Override
118        public void initialize(Bootstrap<AggregatorConfiguration> bootstrap) {
119                bootstrap.addBundle(new AssetsBundle("/assets", "/", "index.html", "static"));
120        }
121
122        @Override
123        public void run(AggregatorConfiguration config, Environment environment) throws Exception {
124                params = config.aggregatorParams;
125                instance = this;
126
127                List<String> wll = new ArrayList<String>();
128                for (String l : config.aggregatorParams.weblichtConfig.getAcceptedTcfLanguages()) {
129                        wll.add(LanguagesISO693.getInstance().code_3ForCode(l));
130                }
131                config.aggregatorParams.weblichtConfig.acceptedTcfLanguages = wll;
132
133                System.out.println("Using parameters: ");
134                try {
135                        System.out.println(new ObjectMapper().writerWithDefaultPrettyPrinter().
136                                        writeValueAsString(config.aggregatorParams));
137                } catch (IOException xc) {
138                }
139
140                environment.getApplicationContext().setSessionHandler(new SessionHandler());
141                environment.getApplicationContext().setErrorHandler(new ErrorHandler());
142                environment.jersey().setUrlPattern("/rest/*");
143                environment.jersey().register(new RestService());
144
145                try {
146                        init();
147                } catch (Exception ex) {
148                        log.error("INIT EXCEPTION", ex);
149                        throw ex; // force exit
150                }
151        }
152
153        public static Aggregator getInstance() {
154                return instance;
155        }
156
157        public AggregatorConfiguration.Params getParams() {
158                return params;
159        }
160
161        public Corpora getCorpora() {
162                return scanCacheAtom.get();
163        }
164
165        public Statistics getScanStatistics() {
166                return scanStatsAtom.get();
167        }
168
169        public Statistics getSearchStatistics() {
170                return searchStatsAtom.get();
171        }
172
173        public void init() throws IOException {
174                log.info("Aggregator initialization started.");
175                sruScanClient = new ThrottledClient(
176                                new ClarinFCSClientBuilder()
177                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
178                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
179                                .addDefaultDataViewParsers()
180                                .registerExtraResponseDataParser(
181                                                new ClarinFCSEndpointDescriptionParser())
182                                .enableLegacySupport()
183                                .buildThreadedClient(),
184                                params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
185                );
186                sruSearchClient = new ThrottledClient(
187                                new ClarinFCSClientBuilder()
188                                .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
189                                .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
190                                .addDefaultDataViewParsers()
191                                .enableLegacySupport()
192                                .buildThreadedClient(),
193                                params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
194                );
195
196                File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH);
197                File corporaOldCacheFile = new File(params.AGGREGATOR_FILE_PATH_BACKUP);
198
199                // init corpora from file
200                {
201                        Corpora corpora = null;
202                        try {
203                                corpora = new ObjectMapper().readValue(corporaCacheFile, Corpora.class);
204                        } catch (Exception xc) {
205                                log.error("Failed to load cached corpora from primary file:", xc);
206                        }
207                        if (corpora == null) {
208                                try {
209                                        corpora = new ObjectMapper().readValue(corporaOldCacheFile, Corpora.class);
210                                } catch (Exception e) {
211                                        log.error("Failed to load cached corpora from backup file:", e);
212                                }
213                        }
214                        if (corpora != null) {
215                                scanCacheAtom.set(corpora);
216                                log.info("corpus list read from file; number of root corpora: " + scanCacheAtom.get().getCorpora().size());
217                        }
218                }
219
220                LanguagesISO693.getInstance(); // force init
221                initLanguageDetector();
222
223                ScanCrawlTask task = new ScanCrawlTask(sruScanClient,
224                                params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH,
225                                params.additionalCQLEndpoints,
226                                null, scanCacheAtom, corporaCacheFile, corporaOldCacheFile,
227                                scanStatsAtom, searchStatsAtom);
228                scheduler.scheduleAtFixedRate(task, params.SCAN_TASK_INITIAL_DELAY,
229                                params.SCAN_TASK_INTERVAL, params.getScanTaskTimeUnit());
230
231                log.info("Aggregator initialization finished.");
232        }
233
234        public void shutdown(AggregatorConfiguration config) {
235                log.info("Aggregator is shutting down.");
236                for (Search search : activeSearches.values()) {
237                        search.shutdown();
238                }
239                shutdownAndAwaitTermination(config.aggregatorParams, sruScanClient, scheduler);
240                shutdownAndAwaitTermination(config.aggregatorParams, sruSearchClient, scheduler);
241                log.info("Aggregator shutdown complete.");
242        }
243
244        // this function should be thread-safe
245        public Search startSearch(SRUVersion version, List<Corpus> corpora,
246                        String searchString, String searchLang,
247                        int firstRecord, int maxRecords) throws Exception {
248                if (corpora.isEmpty()) {
249                        // No corpora
250                        return null;
251                } else if (searchString.isEmpty()) {
252                        // No query
253                        return null;
254                } else {
255                        Search sr = new Search(sruSearchClient, version, searchStatsAtom.get(),
256                                        corpora, searchString, searchLang, maxRecords);
257                        if (activeSearches.size() > SEARCHES_SIZE_GC_THRESHOLD) {
258                                List<Long> toBeRemoved = new ArrayList<Long>();
259                                long t0 = System.currentTimeMillis();
260                                for (Map.Entry<Long, Search> e : activeSearches.entrySet()) {
261                                        long dtmin = (t0 - e.getValue().getCreatedAt()) / 1000 / 60;
262                                        if (dtmin > SEARCHES_AGE_GC_THRESHOLD) {
263                                                log.info("removing search " + e.getKey() + ": " + dtmin + " minutes old");
264                                                toBeRemoved.add(e.getKey());
265                                        }
266                                }
267                                for (Long l : toBeRemoved) {
268                                        activeSearches.remove(l);
269                                }
270                        }
271                        activeSearches.put(sr.getId(), sr);
272                        return sr;
273                }
274        }
275
276        public Search getSearchById(Long id) {
277                return activeSearches.get(id);
278        }
279
280        private static void shutdownAndAwaitTermination(AggregatorConfiguration.Params params,
281                        ThrottledClient sruClient, ExecutorService scheduler) {
282                try {
283                        sruClient.shutdown();
284                        scheduler.shutdown();
285                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
286                        sruClient.shutdownNow();
287                        scheduler.shutdownNow();
288                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
289                } catch (InterruptedException ie) {
290                        sruClient.shutdownNow();
291                        scheduler.shutdownNow();
292                        Thread.currentThread().interrupt();
293                }
294        }
295
296        public void initLanguageDetector() throws IOException {
297                List<LanguageProfile> languageProfiles = new LanguageProfileReader().readAll();
298                languageDetector = LanguageDetectorBuilder
299                                .create(NgramExtractors.standard())
300                                .withProfiles(languageProfiles)
301                                .build();
302
303                textObjectFactory = CommonTextObjectFactories.forDetectingOnLargeText();
304        }
305
306        public String detectLanguage(String text) {
307                return languageDetector.detect(textObjectFactory.forText(text)).orNull();
308        }
309}
Note: See TracBrowser for help on using the repository browser.