/* Part of SWI-Prolog Author: Jan Wielemaker E-mail: J.Wielemaker@vu.nl WWW: http://www.swi-prolog.org Copyright (c) 2006-2015, University of Amsterdam VU University Amsterdam All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ :- module(rdf_persistency, [ rdf_attach_db/2, % +Directory, +Options rdf_detach_db/0, % +Detach current Graph rdf_current_db/1, % -Directory rdf_persistency/2, % +Graph, +Bool rdf_flush_journals/1, % +Options rdf_persistency_property/1, % ?Property rdf_journal_file/2, % ?Graph, ?JournalFile rdf_snapshot_file/2, % ?Graph, ?SnapshotFile rdf_db_to_file/2 % ?Graph, ?FileBase ]). :- use_module(library(semweb/rdf_db), [ rdf_graph/1, rdf_unload_graph/1, rdf_statistics/1, rdf_load_db/1, rdf_retractall/4, rdf_create_graph/1, rdf_assert/4, rdf_update/5, rdf_monitor/2, rdf/4, rdf_save_db/2, rdf_atom_md5/3, rdf_current_ns/2, rdf_register_ns/3 ]). :- autoload(library(apply),[maplist/2,maplist/3,partition/4,exclude/3]). :- autoload(library(debug),[debug/3]). :- autoload(library(error), [permission_error/3,must_be/2,domain_error/2]). :- autoload(library(filesex), [directory_file_path/3,make_directory_path/1]). :- autoload(library(lists),[select/3,append/3]). :- autoload(library(option),[option/2,option/3]). :- autoload(library(readutil),[read_file_to_terms/3]). :- autoload(library(socket),[gethostname/1]). :- autoload(library(thread),[concurrent/3]). :- autoload(library(uri),[uri_encoded/3]). /** RDF persistency plugin This module provides persistency for rdf_db.pl based on the rdf_monitor/2 predicate to track changes to the repository. Where previous versions used autosafe of the whole database using the quick-load format of rdf_db, this version is based on a quick-load file per source (4th argument of rdf/4), and journalling for edit operations. The result is safe, avoids frequent small changes to large files which makes synchronisation and backup expensive and avoids long disruption of the server doing the autosafe. Only loading large files disrupts service for some time. The persistent backup of the database is realised in a directory, using a lock file to avoid corruption due to concurrent access. Each source is represented by two files, the latest snapshot and a journal. The state is restored by loading the snapshot and replaying the journal. The predicate rdf_flush_journals/1 can be used to create fresh snapshots and delete the journals. @tbd If there is a complete `.new' snapshot and no journal, we should move the .new to the plain snapshot name as a means of recovery. @tbd Backup of each graph using one or two files is very costly if there are many graphs. Although the currently used subdirectories avoid hitting OS limits early, this is still not ideal. Probably we should collect (small, older?) files and combine them into a single quick load file. We could call this (similar to GIT) a `pack'. @see rdf_edit.pl */ :- volatile rdf_directory/1, rdf_lock/2, rdf_option/1, source_journal_fd/2, file_base_db/2. :- dynamic rdf_directory/1, % Absolute path rdf_lock/2, % Dir, Lock rdf_option/1, % Defined options source_journal_fd/2, % DB, JournalFD file_base_db/2. % FileBase, DB :- meta_predicate no_agc(0). :- predicate_options(rdf_attach_db/2, 2, [ access(oneof([read_write,read_only])), concurrency(positive_integer), max_open_journals(positive_integer), silent(oneof([true,false,brief])), log_nested_transactions(boolean) ]). %! rdf_attach_db(+Directory, +Options) is det. % % Start persistent operations using Directory as place to store % files. There are several cases: % % * Empty DB, existing directory % Load the DB from the existing directory % % * Full DB, empty directory % Create snapshots for all sources in directory % % Options: % % * access(+AccessMode) % One of =auto= (default), =read_write= or % =read_only=. Read-only access implies that the RDF % store is not locked. It is read at startup and all % modifications to the data are temporary. The default % =auto= mode is =read_write= if the directory is % writeable and the lock can be acquired. Otherwise % it reverts to =read_only=. % % * concurrency(+Jobs) % Number of threads to use for loading the initial % database. If not provided it is the number of CPUs % as optained from the flag =cpu_count=. % % * max_open_journals(+Count) % Maximum number of journals kept open. If not provided, % the default is 10. See limit_fd_pool/0. % % * directory_levels(+Count) % Number of levels of intermediate directories for storing % the graph files. Default is 2. % % * silent(+BoolOrBrief) % If =true= (default =false=), do not print informational % messages. Finally, if =brief= it will show minimal % feedback. % % * log_nested_transactions(+Boolean) % If =true=, nested _log_ transactions are added to the % journal information. By default (=false=), no log-term % is added for nested transactions.\\ % % @error existence_error(source_sink, Directory) % @error permission_error(write, directory, Directory) rdf_attach_db(DirSpec, Options) :- option(access(read_only), Options), !, absolute_file_name(DirSpec, Directory, [ access(read), file_type(directory) ]), rdf_attach_db_ro(Directory, Options). rdf_attach_db(DirSpec, Options) :- option(access(read_write), Options), !, rdf_attach_db_rw(DirSpec, Options). rdf_attach_db(DirSpec, Options) :- absolute_file_name(DirSpec, Directory, [ access(exist), file_type(directory), file_errors(fail) ]), !, ( access_file(Directory, write) -> catch(rdf_attach_db_rw(Directory, Options), E, true), ( var(E) -> true ; E = error(permission_error(lock, rdf_db, _), _) -> print_message(warning, E), print_message(warning, rdf(read_only)), rdf_attach_db(DirSpec, [access(read_only)|Options]) ; throw(E) ) ; print_message(warning, error(permission_error(write, directory, Directory))), print_message(warning, rdf(read_only)), rdf_attach_db_ro(Directory, Options) ). rdf_attach_db(DirSpec, Options) :- catch(rdf_attach_db_rw(DirSpec, Options), E, true), ( var(E) -> true ; print_message(warning, E), print_message(warning, rdf(read_only)), rdf_attach_db(DirSpec, [access(read_only)|Options]) ). rdf_attach_db_rw(DirSpec, Options) :- absolute_file_name(DirSpec, Directory, [ access(write), file_type(directory), file_errors(fail) ]), !, ( rdf_directory(Directory) -> true % update settings? ; rdf_detach_db, mkdir(Directory), lock_db(Directory), assert(rdf_directory(Directory)), assert_options(Options), stop_monitor, % make sure not to register load no_agc(load_db), at_halt(rdf_detach_db), start_monitor ). rdf_attach_db_rw(DirSpec, Options) :- absolute_file_name(DirSpec, Directory, [ solutions(all) ]), ( exists_directory(Directory) -> access_file(Directory, write) ; catch(make_directory(Directory), _, fail) ), !, rdf_attach_db(Directory, Options). rdf_attach_db_rw(DirSpec, _) :- % Generate an existence or absolute_file_name(DirSpec, % permission error Directory, [ access(exist), file_type(directory) ]), permission_error(write, directory, Directory). %! rdf_attach_db_ro(+Directory, +Options) % % Open an RDF database in read-only mode. rdf_attach_db_ro(Directory, Options) :- rdf_detach_db, assert(rdf_directory(Directory)), assert_options(Options), stop_monitor, % make sure not to register load no_agc(load_db). assert_options([]). assert_options([H|T]) :- ( option_type(H, Check) -> Check, assert(rdf_option(H)) ; true % ignore options we do not understand ), assert_options(T). option_type(concurrency(X), must_be(positive_integer, X)). option_type(max_open_journals(X), must_be(positive_integer, X)). option_type(directory_levels(X), must_be(positive_integer, X)). option_type(silent(X), must_be(oneof([true,false,brief]), X)). option_type(log_nested_transactions(X), must_be(boolean, X)). option_type(access(X), must_be(oneof([read_write, read_only]), X)). %! rdf_persistency_property(?Property) is nondet. % % True when Property is a property of the current persistent database. % Exposes the properties that can be passed as options to % rdf_attach_db/2. Specifically, % rdf_persistency_property(access(read_only)) is true iff the database % is mounted in read-only mode. In addition, the following property is % supported: % % - directory(Dir) % The directory in which the database resides. rdf_persistency_property(Property) :- var(Property), !, rdf_persistency_property_(Property). rdf_persistency_property(Property) :- rdf_persistency_property_(Property), !. rdf_persistency_property_(Property) :- rdf_option(Property). rdf_persistency_property_(directory(Dir)) :- rdf_directory(Dir). %! no_agc(:Goal) % % Run Goal with atom garbage collection disabled. Loading an RDF % database creates large amounts of atoms we *know* are not % garbage. no_agc(Goal) :- current_prolog_flag(agc_margin, Old), setup_call_cleanup( set_prolog_flag(agc_margin, 0), Goal, set_prolog_flag(agc_margin, Old)). %! rdf_detach_db is det. % % Detach from the current database. Succeeds silently if no % database is attached. Normally called at the end of the program % through at_halt/1. rdf_detach_db :- debug(halt, 'Detaching RDF database', []), stop_monitor, close_journals, ( retract(rdf_directory(Dir)) -> debug(halt, 'DB Directory: ~w', [Dir]), save_prefixes(Dir), retractall(rdf_option(_)), retractall(source_journal_fd(_,_)), unlock_db(Dir) ; true ). %! rdf_current_db(?Dir) % % True if Dir is the current RDF persistent database. rdf_current_db(Directory) :- rdf_directory(Dir), !, Dir = Directory. %! rdf_flush_journals(+Options) % % Flush dirty journals. Options: % % * min_size(+KB) % Only flush if journal is over KB in size. % * graph(+Graph) % Only flush the journal of Graph % % @tbd Provide a default for min_size? rdf_flush_journals(Options) :- option(graph(Graph), Options, _), forall(rdf_graph(Graph), rdf_flush_journal(Graph, Options)). rdf_flush_journal(Graph, Options) :- db_files(Graph, _SnapshotFile, JournalFile), db_file(JournalFile, File), ( \+ exists_file(File) -> true ; memberchk(min_size(KB), Options), size_file(JournalFile, Size), Size / 1024 < KB -> true ; create_db(Graph) ). /******************************* * LOAD * *******************************/ %! load_db is det. % % Reload database from the directory specified by rdf_directory/1. % First we find all names graphs using find_dbs/1 and then we load % them. load_db :- rdf_directory(Dir), concurrency(Jobs), cpu_stat_key(Jobs, StatKey), get_time(Wall0), statistics(StatKey, T0), load_prefixes(Dir), verbosity(Silent), find_dbs(Dir, Graphs, SnapShots, Journals), length(Graphs, GraphCount), maplist(rdf_unload_graph, Graphs), rdf_statistics(triples(Triples0)), load_sources(snapshots, SnapShots, Silent, Jobs), load_sources(journals, Journals, Silent, Jobs), rdf_statistics(triples(Triples1)), statistics(StatKey, T1), get_time(Wall1), T is T1 - T0, Wall is Wall1 - Wall0, Triples = Triples1 - Triples0, message_level(Silent, Level), print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))). load_sources(_, [], _, _) :- !. load_sources(Type, Sources, Silent, Jobs) :- length(Sources, Count), RunJobs is min(Count, Jobs), print_message(informational, rdf(restoring(Type, Count, RunJobs))), make_goals(Sources, Silent, 1, Count, Goals), concurrent(RunJobs, Goals, []). %! make_goals(+DBs, +Silent, +Index, +Total, -Goals) make_goals([], _, _, _, []). make_goals([DB|T0], Silent, I, Total, [load_source(DB, Silent, I, Total)|T]) :- I2 is I + 1, make_goals(T0, Silent, I2, Total, T). verbosity(Silent) :- rdf_option(silent(Silent)), !. verbosity(Silent) :- current_prolog_flag(verbose, silent), !, Silent = true. verbosity(brief). %! concurrency(-Jobs) % % Number of jobs to run concurrently. concurrency(Jobs) :- rdf_option(concurrency(Jobs)), !. concurrency(Jobs) :- current_prolog_flag(cpu_count, Jobs), Jobs > 0, !. concurrency(1). cpu_stat_key(1, cputime) :- !. cpu_stat_key(_, process_cputime). %! find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det. % % Scan the persistent database and return a list of snapshots and % journals, both sorted by file-size. Each term is of the form % % == % db(Size, Ext, DB, DBFile, Depth) % == find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :- directory_files(Dir, Files), phrase(scan_db_files(Files, Dir, '.', 0), Scanned), maplist(db_graph, Scanned, UnsortedGraphs), sort(UnsortedGraphs, Graphs), ( consider_reindex_db(Dir, Graphs, Scanned) -> find_dbs(Dir, Graphs, SnapBySize, JournalBySize) ; partition(db_is_snapshot, Scanned, Snapshots, Journals), sort(Snapshots, SnapBySize), sort(Journals, JournalBySize) ). consider_reindex_db(Dir, Graphs, Scanned) :- length(Graphs, Count), Count > 0, DepthNeeded is floor(log(Count)/log(256)), ( maplist(depth_db(DepthNow), Scanned) -> ( DepthNeeded > DepthNow -> true ; retractall(rdf_option(directory_levels(_))), assertz(rdf_option(directory_levels(DepthNow))), fail ) ; true ), reindex_db(Dir, DepthNeeded). db_is_snapshot(Term) :- arg(2, Term, trp). db_graph(Term, DB) :- arg(3, Term, DB). db_file_name(Term, File) :- arg(4, Term, File). depth_db(Depth, DB) :- arg(5, DB, Depth). %! scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det. % % Produces a list of db(DB, Size, File) for all recognised RDF % database files. File is relative to the database directory Dir. scan_db_files([], _, _, _) --> []. scan_db_files([Nofollow|T], Dir, Prefix, Depth) --> { nofollow(Nofollow) }, !, scan_db_files(T, Dir, Prefix, Depth). scan_db_files([File|T], Dir, Prefix, Depth) --> { file_name_extension(Base, Ext, File), db_extension(Ext), !, rdf_db_to_file(DB, Base), directory_file_path(Prefix, File, DBFile), directory_file_path(Dir, DBFile, AbsFile), size_file(AbsFile, Size) }, [ db(Size, Ext, DB, AbsFile, Depth) ], scan_db_files(T, Dir, Prefix, Depth). scan_db_files([D|T], Dir, Prefix, Depth) --> { directory_file_path(Prefix, D, SubD), directory_file_path(Dir, SubD, AbsD), exists_directory(AbsD), \+ read_link(AbsD, _, _), % Do not follow links !, directory_files(AbsD, SubFiles), SubDepth is Depth + 1 }, scan_db_files(SubFiles, Dir, SubD, SubDepth), scan_db_files(T, Dir, Prefix, Depth). scan_db_files([_|T], Dir, Prefix, Depth) --> scan_db_files(T, Dir, Prefix, Depth). nofollow(.). nofollow(..). db_extension(trp). db_extension(jrn). :- public load_source/4. % called through make_goals/5 load_source(DB, Silent, Nth, Total) :- db_file_name(DB, File), db_graph(DB, Graph), message_level(Silent, Level), graph_triple_count(Graph, Count0), statistics(cputime, T0), ( db_is_snapshot(DB) -> print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))), rdf_load_db(File) ; print_message(Level, rdf(restore(Silent, journal(Graph, File)))), load_journal(File, Graph) ), statistics(cputime, T1), T is T1 - T0, graph_triple_count(Graph, Count1), Count is Count1 - Count0, print_message(Level, rdf(restore(Silent, done(Graph, T, Count, Nth, Total)))). graph_triple_count(Graph, Count) :- rdf_statistics(triples_by_graph(Graph, Count)), !. graph_triple_count(_, 0). %! attach_graph(+Graph, +Options) is det. % % Load triples and reload journal from the indicated snapshot % file. attach_graph(Graph, Options) :- ( option(silent(true), Options) -> Level = silent ; Level = informational ), db_files(Graph, SnapshotFile, JournalFile), rdf_retractall(_,_,_,Graph), statistics(cputime, T0), print_message(Level, rdf(restore(Silent, Graph))), db_file(SnapshotFile, AbsSnapShot), ( exists_file(AbsSnapShot) -> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))), rdf_load_db(AbsSnapShot) ; true ), ( exists_db(JournalFile) -> print_message(Level, rdf(restore(Silent, journal(JournalFile)))), load_journal(JournalFile, Graph) ; true ), statistics(cputime, T1), T is T1 - T0, ( rdf_statistics(triples_by_graph(Graph, Count)) -> true ; Count = 0 ), print_message(Level, rdf(restore(Silent, done(Graph, T, Count)))). message_level(true, silent) :- !. message_level(_, informational). /******************************* * LOAD JOURNAL * *******************************/ %! load_journal(+File:atom, +DB:atom) is det. % % Process transactions from the RDF journal File, adding the given % named graph. load_journal(File, DB) :- rdf_create_graph(DB), setup_call_cleanup( open(File, read, In, [encoding(utf8)]), ( read(In, T0), process_journal(T0, In, DB) ), close(In)). process_journal(end_of_file, _, _) :- !. process_journal(Term, In, DB) :- ( process_journal_term(Term, DB) -> true ; throw(error(type_error(journal_term, Term), _)) ), read(In, T2), process_journal(T2, In, DB). process_journal_term(assert(S,P,O), DB) :- rdf_assert(S,P,O,DB). process_journal_term(assert(S,P,O,Line), DB) :- rdf_assert(S,P,O,DB:Line). process_journal_term(retract(S,P,O), DB) :- rdf_retractall(S,P,O,DB). process_journal_term(retract(S,P,O,Line), DB) :- rdf_retractall(S,P,O,DB:Line). process_journal_term(update(S,P,O,Action), DB) :- ( rdf_update(S,P,O,DB, Action) -> true ; print_message(warning, rdf(update_failed(S,P,O,Action))) ). process_journal_term(start(_), _). % journal open/close process_journal_term(end(_), _). process_journal_term(begin(_), _). % logged transaction (compatibility) process_journal_term(end, _). process_journal_term(begin(_,_,_,_), _). % logged transaction (current) process_journal_term(end(_,_,_), _). /******************************* * CREATE JOURNAL * *******************************/ :- dynamic blocked_db/2, % DB, Reason transaction_message/3, % Nesting, Time, Message transaction_db/3. % Nesting, DB, Id %! rdf_persistency(+DB, Bool) % % Specify whether a database is persistent. Switching to =false= % kills the persistent state. Switching to =true= creates it. rdf_persistency(DB, Bool) :- must_be(atom, DB), must_be(boolean, Bool), fail. rdf_persistency(DB, false) :- !, ( blocked_db(DB, persistency) -> true ; assert(blocked_db(DB, persistency)), delete_db(DB) ). rdf_persistency(DB, true) :- ( retract(blocked_db(DB, persistency)) -> create_db(DB) ; true ). %! rdf_db:property_of_graph(?Property, +Graph) is nondet. % % Extend rdf_graph_property/2 with new properties. :- multifile rdf_db:property_of_graph/2. rdf_db:property_of_graph(persistent(State), Graph) :- ( blocked_db(Graph, persistency) -> State = false ; State = true ). %! start_monitor is det. %! stop_monitor is det. % % Start/stop monitoring the RDF database for changes and update % the journal. start_monitor :- rdf_monitor(monitor, [ -assert(load) ]). stop_monitor :- rdf_monitor(monitor, [ -all ]). %! monitor(+Term) is semidet. % % Handle an rdf_monitor/2 callback to deal with persistency. Note % that the monitor calls that come from rdf_db.pl that deal with % database changes are serialized. They do come from different % threads though. monitor(Msg) :- debug(monitor, 'Monitor: ~p~n', [Msg]), fail. monitor(assert(S,P,O,DB:Line)) :- !, \+ blocked_db(DB, _), journal_fd(DB, Fd), open_transaction(DB, Fd), format(Fd, '~q.~n', [assert(S,P,O,Line)]), sync_journal(DB, Fd). monitor(assert(S,P,O,DB)) :- \+ blocked_db(DB, _), journal_fd(DB, Fd), open_transaction(DB, Fd), format(Fd, '~q.~n', [assert(S,P,O)]), sync_journal(DB, Fd). monitor(retract(S,P,O,DB:Line)) :- !, \+ blocked_db(DB, _), journal_fd(DB, Fd), open_transaction(DB, Fd), format(Fd, '~q.~n', [retract(S,P,O,Line)]), sync_journal(DB, Fd). monitor(retract(S,P,O,DB)) :- \+ blocked_db(DB, _), journal_fd(DB, Fd), open_transaction(DB, Fd), format(Fd, '~q.~n', [retract(S,P,O)]), sync_journal(DB, Fd). monitor(update(S,P,O,DB:Line,Action)) :- !, \+ blocked_db(DB, _), ( Action = graph(NewDB) -> monitor(assert(S,P,O,NewDB)), monitor(retract(S,P,O,DB:Line)) ; journal_fd(DB, Fd), format(Fd, '~q.~n', [update(S,P,O,Action)]), sync_journal(DB, Fd) ). monitor(update(S,P,O,DB,Action)) :- \+ blocked_db(DB, _), ( Action = graph(NewDB) -> monitor(assert(S,P,O,NewDB)), monitor(retract(S,P,O,DB)) ; journal_fd(DB, Fd), open_transaction(DB, Fd), format(Fd, '~q.~n', [update(S,P,O,Action)]), sync_journal(DB, Fd) ). monitor(load(BE, _DumpFileURI)) :- ( BE = end(Graphs) -> sync_loaded_graphs(Graphs) ; true ). monitor(create_graph(Graph)) :- \+ blocked_db(Graph, _), journal_fd(Graph, Fd), open_transaction(Graph, Fd), sync_journal(Graph, Fd). monitor(reset) :- forall(rdf_graph(Graph), delete_db(Graph)). % TBD: Remove empty directories? monitor(transaction(BE, Id)) :- monitor_transaction(Id, BE). monitor_transaction(load_journal(DB), begin(_)) :- !, assert(blocked_db(DB, journal)). monitor_transaction(load_journal(DB), end(_)) :- !, retractall(blocked_db(DB, journal)). monitor_transaction(parse(URI), begin(_)) :- !, ( blocked_db(URI, persistency) -> true ; assert(blocked_db(URI, parse)) ). monitor_transaction(parse(URI), end(_)) :- !, ( retract(blocked_db(URI, parse)) -> create_db(URI) ; true ). monitor_transaction(unload(DB), begin(_)) :- !, ( blocked_db(DB, persistency) -> true ; assert(blocked_db(DB, unload)) ). monitor_transaction(unload(DB), end(_)) :- !, ( retract(blocked_db(DB, unload)) -> delete_db(DB) ; true ). monitor_transaction(log(Msg), begin(N)) :- !, check_nested(N), get_time(Time), asserta(transaction_message(N, Time, Msg)). monitor_transaction(log(_), end(N)) :- check_nested(N), retract(transaction_message(N, _, _)), !, findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs), end_transactions(DBs, N). monitor_transaction(log(Msg, DB), begin(N)) :- !, check_nested(N), get_time(Time), asserta(transaction_message(N, Time, Msg)), journal_fd(DB, Fd), open_transaction(DB, Fd). monitor_transaction(log(Msg, _DB), end(N)) :- monitor_transaction(log(Msg), end(N)). %! check_nested(+Level) is semidet. % % True if we must log this transaction. This is always the case % for toplevel transactions. Nested transactions are only logged % if log_nested_transactions(true) is defined. check_nested(0) :- !. check_nested(_) :- rdf_option(log_nested_transactions(true)). %! open_transaction(+DB, +Fd) is det. % % Add a begin(Id, Level, Time, Message) term if a transaction % involves DB. Id is an incremental integer, where each database % has its own counter. Level is the nesting level, Time a floating % point timestamp and Message te message provided as argument to % the log message. open_transaction(DB, Fd) :- transaction_message(N, Time, Msg), !, ( transaction_db(N, DB, _) -> true ; next_transaction_id(DB, Id), assert(transaction_db(N, DB, Id)), RoundedTime is round(Time*100)/100, format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)]) ). open_transaction(_,_). %! next_transaction_id(+DB, -Id) is det. % % Id is the number to user for the next logged transaction on DB. % Transactions in each named graph are numbered in sequence. % Searching the Id of the last transaction is performed by the 2nd % clause starting 1Kb from the end and doubling this offset each % failure. :- dynamic current_transaction_id/2. next_transaction_id(DB, Id) :- retract(current_transaction_id(DB, Last)), !, Id is Last + 1, assert(current_transaction_id(DB, Id)). next_transaction_id(DB, Id) :- db_files(DB, _, Journal), exists_file(Journal), !, size_file(Journal, Size), open_db(Journal, read, In, []), call_cleanup(iterative_expand(In, Size, Last), close(In)), Id is Last + 1, assert(current_transaction_id(DB, Id)). next_transaction_id(DB, 1) :- assert(current_transaction_id(DB, 1)). iterative_expand(_, 0, 0) :- !. iterative_expand(In, Size, Last) :- % Scan growing sections from the end Max is floor(log(Size)/log(2)), between(10, Max, Step), Offset is -(1< 0, !. iterative_expand(In, _, Last) :- % Scan the whole file seek(In, 0, bof, _), read(In, T0), last_transaction_id(T0, In, 0, Last). last_transaction_id(end_of_file, _, Last, Last) :- !. last_transaction_id(end(Id, _, _), In, _, Last) :- read(In, T1), last_transaction_id(T1, In, Id, Last). last_transaction_id(_, In, Id, Last) :- read(In, T1), last_transaction_id(T1, In, Id, Last). %! end_transactions(+DBs:list(atom:id)) is det. % % End a transaction that affected the given list of databases. We % write the list of other affected databases as an argument to the % end-term to facilitate fast finding of the related transactions. % % In each database, the transaction is ended with a term end(Id, % Nesting, Others), where Id and Nesting are the transaction % identifier and nesting (see open_transaction/2) and Others is a % list of DB:Id, indicating other databases affected by the % transaction. end_transactions(DBs, N) :- end_transactions(DBs, DBs, N). end_transactions([], _, _). end_transactions([DB:Id|T], DBs, N) :- journal_fd(DB, Fd), once(select(DB:Id, DBs, Others)), format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]), sync_journal(DB, Fd), end_transactions(T, DBs, N). %! sync_loaded_graphs(+Graphs) % % Called after a binary triple has been loaded that added triples % to the given graphs. sync_loaded_graphs(Graphs) :- maplist(create_db, Graphs). /******************************* * JOURNAL FILES * *******************************/ %! journal_fd(+DB, -Stream) is det. % % Get an open stream to a journal. If the journal is not open, old % journals are closed to satisfy the =max_open_journals= option. % Then the journal is opened in =append= mode. Journal files are % always encoded as UTF-8 for portability as well as to ensure % full coverage of Unicode. journal_fd(DB, Fd) :- source_journal_fd(DB, Fd), !. journal_fd(DB, Fd) :- with_mutex(rdf_journal_file, journal_fd_(DB, Out)), Fd = Out. journal_fd_(DB, Fd) :- source_journal_fd(DB, Fd), !. journal_fd_(DB, Fd) :- limit_fd_pool, db_files(DB, _Snapshot, Journal), open_db(Journal, append, Fd, [ close_on_abort(false) ]), time_stamp(Now), format(Fd, '~q.~n', [start([time(Now)])]), assert(source_journal_fd(DB, Fd)). % new one at the end %! limit_fd_pool is det. % % Limit the number of open journals to max_open_journals (10). % Note that calls from rdf_monitor/2 are issued in different % threads, but as they are part of write operations they are fully % synchronised. limit_fd_pool :- predicate_property(source_journal_fd(_, _), number_of_clauses(N)), !, ( rdf_option(max_open_journals(Max)) -> true ; Max = 10 ), Close is N - Max, forall(between(1, Close, _), close_oldest_journal). limit_fd_pool. close_oldest_journal :- source_journal_fd(DB, _Fd), !, debug(rdf_persistency, 'Closing old journal for ~q', [DB]), close_journal(DB). close_oldest_journal. %! sync_journal(+DB, +Fd) % % Sync journal represented by database and stream. If the DB is % involved in a transaction there is no point flushing until the % end of the transaction. sync_journal(DB, _) :- transaction_db(_, DB, _), !. sync_journal(_, Fd) :- flush_output(Fd). %! close_journal(+DB) is det. % % Close the journal associated with DB if it is open. close_journal(DB) :- with_mutex(rdf_journal_file, close_journal_(DB)). close_journal_(DB) :- ( retract(source_journal_fd(DB, Fd)) -> time_stamp(Now), format(Fd, '~q.~n', [end([time(Now)])]), close(Fd, [force(true)]) ; true ). %! close_journals % % Close all open journals. close_journals :- forall(source_journal_fd(DB, _), catch(close_journal(DB), E, print_message(error, E))). %! create_db(+Graph) % % Create a saved version of Graph in corresponding file, close and % delete journals. create_db(Graph) :- \+ rdf(_,_,_,Graph), !, debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]), delete_db(Graph). create_db(Graph) :- debug(rdf_persistency, 'Saving Graph ~w', [Graph]), close_journal(Graph), db_abs_files(Graph, Snapshot, Journal), atom_concat(Snapshot, '.new', NewSnapshot), ( catch(( create_directory_levels(Snapshot), rdf_save_db(NewSnapshot, Graph) ), Error, ( print_message(warning, Error), fail )) -> ( exists_file(Journal) -> delete_file(Journal) ; true ), rename_file(NewSnapshot, Snapshot), debug(rdf_persistency, 'Saved Graph ~w', [Graph]) ; catch(delete_file(NewSnapshot), _, true) ). %! delete_db(+DB) % % Remove snapshot and journal file for DB. delete_db(DB) :- with_mutex(rdf_journal_file, delete_db_(DB)). delete_db_(DB) :- close_journal_(DB), db_abs_files(DB, Snapshot, Journal), !, ( exists_file(Journal) -> delete_file(Journal) ; true ), ( exists_file(Snapshot) -> delete_file(Snapshot) ; true ). delete_db_(_). /******************************* * LOCKING * *******************************/ %! lock_db(+Dir) % % Lock the database directory Dir. lock_db(Dir) :- lockfile(Dir, File), catch(open(File, update, Out, [lock(write), wait(false)]), error(permission_error(Access, _, _), _), locked_error(Access, Dir)), ( current_prolog_flag(pid, PID) -> true ; PID = 0 % TBD: Fix in Prolog ), time_stamp(Now), gethostname(Host), format(Out, '/* RDF Database is in use */~n~n', []), format(Out, '~q.~n', [ locked([ time(Now), pid(PID), host(Host) ]) ]), flush_output(Out), set_end_of_stream(Out), assert(rdf_lock(Dir, lock(Out, File))), at_halt(unlock_db(Dir)). locked_error(lock, Dir) :- lockfile(Dir, File), ( catch(read_file_to_terms(File, Terms, []), _, fail), Terms = [locked(Args)] -> Context = rdf_locked(Args) ; Context = context(_, 'Database is in use') ), throw(error(permission_error(lock, rdf_db, Dir), Context)). locked_error(open, Dir) :- throw(error(permission_error(lock, rdf_db, Dir), context(_, 'Lock file cannot be opened'))). %! unlock_db(+Dir) is det. %! unlock_db(+Stream, +File) is det. unlock_db(Dir) :- retract(rdf_lock(Dir, lock(Out, File))), !, unlock_db(Out, File). unlock_db(_). unlock_db(Out, File) :- close(Out), delete_file(File). /******************************* * FILENAMES * *******************************/ lockfile(Dir, LockFile) :- atomic_list_concat([Dir, /, lock], LockFile). directory_levels(Levels) :- rdf_option(directory_levels(Levels)), !. directory_levels(2). db_file(Base, File) :- rdf_directory(Dir), directory_levels(Levels), db_file(Dir, Base, Levels, File). db_file(Dir, Base, Levels, File) :- dir_levels(Base, Levels, Segments, [Base]), atomic_list_concat([Dir|Segments], /, File). open_db(Base, Mode, Stream, Options) :- db_file(Base, File), create_directory_levels(File), open(File, Mode, Stream, [encoding(utf8)|Options]). create_directory_levels(_File) :- rdf_option(directory_levels(0)), !. create_directory_levels(File) :- file_directory_name(File, Dir), make_directory_path(Dir). exists_db(Base) :- db_file(Base, File), exists_file(File). %! dir_levels(+File, +Levels, ?Segments, ?Tail) is det. % % Create a list of intermediate directory names for File. Each % directory consists of two hexadecimal digits. dir_levels(_, 0, Segments, Segments) :- !. dir_levels(File, Levels, Segments, Tail) :- rdf_atom_md5(File, 1, Hash), create_dir_levels(Levels, 0, Hash, Segments, Tail). create_dir_levels(0, _, _, Segments, Segments) :- !. create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :- sub_atom(Hash, S, 2, _, S1), S2 is S+2, N2 is N-1, create_dir_levels(N2, S2, Hash, Segments0, Tail). %! db_files(+DB, -Snapshot, -Journal). %! db_files(-DB, +Snapshot, -Journal). %! db_files(-DB, -Snapshot, +Journal). % % True if named graph DB is represented by the files Snapshot and % Journal. The filenames are local to the directory representing % the store. db_files(DB, Snapshot, Journal) :- nonvar(DB), !, rdf_db_to_file(DB, Base), atom_concat(Base, '.trp', Snapshot), atom_concat(Base, '.jrn', Journal). db_files(DB, Snapshot, Journal) :- nonvar(Snapshot), !, atom_concat(Base, '.trp', Snapshot), atom_concat(Base, '.jrn', Journal), rdf_db_to_file(DB, Base). db_files(DB, Snapshot, Journal) :- nonvar(Journal), !, atom_concat(Base, '.jrn', Journal), atom_concat(Base, '.trp', Snapshot), rdf_db_to_file(DB, Base). db_abs_files(DB, Snapshot, Journal) :- db_files(DB, Snapshot0, Journal0), db_file(Snapshot0, Snapshot), db_file(Journal0, Journal). %! rdf_journal_file(+Graph, -File) is semidet. %! rdf_journal_file(-Graph, -File) is nondet. % % True if File the name of the existing journal file for Graph. rdf_journal_file(Graph, Journal) :- ( var(Graph) -> rdf_graph(Graph) ; true ), db_abs_files(Graph, _Snapshot, Journal), exists_file(Journal). %! rdf_snapshot_file(+Graph, -File) is semidet. %! rdf_snapshot_file(-Graph, -File) is nondet. % % True if File the name of the existing snapshot file for Graph. rdf_snapshot_file(Graph, Snapshot) :- ( var(Graph) -> rdf_graph(Graph) % also pick the empty graphs ; true ), db_abs_files(Graph, Snapshot, _Journal), exists_file(Snapshot). %! rdf_db_to_file(+DB, -File) is det. %! rdf_db_to_file(-DB, +File) is det. % % Translate between database encoding (often an file or URL) and % the name we store in the directory. We keep a cache for two % reasons. Speed, but much more important is that the mapping of % raw --> encoded provided by www_form_encode/2 is not guaranteed % to be unique by the W3C standards. rdf_db_to_file(DB, File) :- file_base_db(File, DB), !. rdf_db_to_file(DB, File) :- url_to_filename(DB, File), assert(file_base_db(File, DB)). %! url_to_filename(+URL, -FileName) is det. %! url_to_filename(-URL, +FileName) is det. % % Turn a valid URL into a filename. Earlier versions used % www_form_encode/2, but this can produce characters that are not % valid in filenames. We will use the same encoding as % www_form_encode/2, but using our own rules for allowed % characters. The only requirement is that we avoid any filename % special character in use. The current encoding use US-ASCII % alnum characters, _ and % url_to_filename(URL, FileName) :- atomic(URL), !, atom_codes(URL, Codes), phrase(url_encode(EncCodes), Codes), atom_codes(FileName, EncCodes). url_to_filename(URL, FileName) :- uri_encoded(path, URL, FileName). url_encode([0'+|T]) --> " ", !, url_encode(T). url_encode([C|T]) --> alphanum(C), !, url_encode(T). url_encode([C|T]) --> no_enc_extra(C), !, url_encode(T). url_encode(Enc) --> ( "\r\n" ; "\n" ), !, { string_codes("%0D%0A", Codes), append(Codes, T, Enc) }, url_encode(T). url_encode([]) --> eos, !. url_encode([0'%,D1,D2|T]) --> [C], { Dv1 is (C>>4 /\ 0xf), Dv2 is (C /\ 0xf), code_type(D1, xdigit(Dv1)), code_type(D2, xdigit(Dv2)) }, url_encode(T). eos([], []). alphanum(C) --> [C], { C < 128, % US-ASCII code_type(C, alnum) }. no_enc_extra(0'_) --> "_". /******************************* * REINDEX * *******************************/ %! reindex_db(+Dir, +Levels) % % Reindex the database by creating intermediate directories. reindex_db(Dir, Levels) :- directory_files(Dir, Files), reindex_files(Files, Dir, '.', 0, Levels), remove_empty_directories(Files, Dir). reindex_files([], _, _, _, _). reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :- nofollow(Nofollow), !, reindex_files(Files, Dir, Prefix, CLevel, Levels). reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :- CLevel \== Levels, file_name_extension(_Base, Ext, File), db_extension(Ext), !, directory_file_path(Prefix, File, DBFile), directory_file_path(Dir, DBFile, OldPath), db_file(Dir, File, Levels, NewPath), debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]), file_directory_name(NewPath, NewDir), make_directory_path(NewDir), rename_file(OldPath, NewPath), reindex_files(Files, Dir, Prefix, CLevel, Levels). reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :- directory_file_path(Prefix, D, SubD), directory_file_path(Dir, SubD, AbsD), exists_directory(AbsD), \+ read_link(AbsD, _, _), % Do not follow links !, directory_files(AbsD, SubFiles), CLevel2 is CLevel + 1, reindex_files(SubFiles, Dir, SubD, CLevel2, Levels), reindex_files(Files, Dir, Prefix, CLevel, Levels). reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :- reindex_files(Files, Dir, Prefix, CLevel, Levels). remove_empty_directories([], _). remove_empty_directories([File|Files], Dir) :- \+ nofollow(File), directory_file_path(Dir, File, Path), exists_directory(Path), \+ read_link(Path, _, _), !, directory_files(Path, Content), exclude(nofollow, Content, RealContent), ( RealContent == [] -> debug(rdf_persistency, 'Remove empty dir ~q', [Path]), delete_directory(Path) ; remove_empty_directories(RealContent, Path) ), remove_empty_directories(Files, Dir). remove_empty_directories([_|Files], Dir) :- remove_empty_directories(Files, Dir). /******************************* * PREFIXES * *******************************/ save_prefixes(Dir) :- atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]), write_prefixes(Out), close(Out)). write_prefixes(Out) :- format(Out, '% Snapshot of defined RDF prefixes~n~n', []), forall(rdf_current_ns(Alias, URI), format(Out, 'prefix(~q, ~q).~n', [Alias, URI])). %! load_prefixes(+RDFDBDir) is det. % % If the file RDFDBDir/prefixes.db exists, load the prefixes. The % prefixes are registered using rdf_register_ns/3. Possible errors % because the prefix definitions have changed are printed as % warnings, retaining the old definition. Note that changing % prefixes generally requires reloading all RDF from the source. load_prefixes(Dir) :- atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), ( exists_file(PrefixFile) -> setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]), read_prefixes(In), close(In)) ; true ). read_prefixes(Stream) :- read_term(Stream, T0, []), read_prefixes(T0, Stream). read_prefixes(end_of_file, _) :- !. read_prefixes(prefix(Alias, URI), Stream) :- !, must_be(atom, Alias), must_be(atom, URI), catch(rdf_register_ns(Alias, URI, []), E, print_message(warning, E)), read_term(Stream, T, []), read_prefixes(T, Stream). read_prefixes(Term, _) :- domain_error(prefix_term, Term). /******************************* * UTIL * *******************************/ %! mkdir(+Directory) % % Create a directory if it does not already exist. mkdir(Directory) :- exists_directory(Directory), !. mkdir(Directory) :- make_directory(Directory). %! time_stamp(-Integer) % % Return time-stamp rounded to integer. time_stamp(Int) :- get_time(Now), Int is round(Now). /******************************* * MESSAGES * *******************************/ :- multifile prolog:message/3, prolog:message_context/3. prolog:message(rdf(Term)) --> message(Term). message(restoring(Type, Count, Jobs)) --> [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ]. message(restore(attached(Graphs, Triples, Time/Wall))) --> { catch(Percent is round(100*Time/Wall), _, Percent = 0) }, [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'- [Graphs, Triples, Wall, Percent, Time] ]. % attach_graph/2 message(restore(true, Action)) --> !, silent_message(Action). message(restore(brief, Action)) --> !, brief_message(Action). message(restore(_, Graph)) --> [ 'Restoring ~p ... '-[Graph], flush ]. message(restore(_, snapshot(_))) --> [ at_same_line, '(snapshot) '-[], flush ]. message(restore(_, journal(_))) --> [ at_same_line, '(journal) '-[], flush ]. message(restore(_, done(_, Time, Count))) --> [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. % load_source/4 message(restore(_, snapshot(G, _))) --> [ 'Restoring ~p\t(snapshot)'-[G], flush ]. message(restore(_, journal(G, _))) --> [ 'Restoring ~p\t(journal)'-[G], flush ]. message(restore(_, done(_, Time, Count))) --> [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. % journal handling message(update_failed(S,P,O,Action)) --> [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ]. % directory reindexing message(reindex(Count, Depth)) --> [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ]. message(reindex(Depth)) --> [ 'Fixing database directory structure (~d levels)'-[Depth] ]. message(read_only) --> [ 'Cannot write persistent store; continuing in read-only mode.', nl, 'All changes to the RDF store will be lost if this process terminates.' ]. silent_message(_Action) --> []. brief_message(done(Graph, _Time, _Count, Nth, Total)) --> { file_base_name(Graph, Base) }, [ at_same_line, '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total], flush ]. brief_message(_) --> []. prolog:message_context(rdf_locked(Args)) --> { memberchk(time(Time), Args), memberchk(pid(Pid), Args), format_time(string(S), '%+', Time) }, [ nl, 'locked at ~s by process id ~w'-[S,Pid] ].