module type INPUT = sig type k type s val eq : k -> k -> bool val get_key : s -> k val is_first : s -> bool val is_last : s -> bool val printKey : k -> unit val lt : s -> s -> bool end;; module StreamSort = functor (I:INPUT) -> struct exception PagesAfterEOS exception EmptyList exception EmptyStream let rec merge l = let peekStrip h = match Stream.peek h with | None -> raise EmptyStream | Some v -> v in let rec min_of l = match l with | [] -> raise EmptyList | [a] -> (a, []) | h::t -> ( let (m, r) = min_of t in if I.lt (peekStrip (snd h)) (peekStrip (snd m)) then (h, m::r) else (m, h::r) ) in match l with | [] -> [< >] | _ -> ( let ((_, mlist), rest) = min_of l in let elt = Stream.next mlist in if Stream.peek mlist = None then [< 'elt; merge rest >] else [< 'elt; merge l >] );; let sort i = let rec get_first_pages i = match (Stream.peek i) with | None -> [] | Some page when I.is_first page -> ( Stream.junk i; let q = Queue.create () in Queue.add page q; (I.get_key page, ref (Some q))::(get_first_pages i) ) | Some page -> [] in let starts = get_first_pages i in let rec get_next_page k = let cache = List.assoc k starts in match !cache with | None -> None | Some q when not (Queue.is_empty q) -> ( let h = Queue.take q in if I.is_last h then cache := None else cache := Some q; Some h ) | Some _ -> try ( let p = Stream.next i in let ik = I.get_key p in if ik = k then ( if I.is_last p then cache := None; Some p ) else ( let icache = List.assoc ik starts in (match !icache with | None -> raise PagesAfterEOS | Some q -> Queue.add p q); get_next_page k ) ) with Stream.Failure -> None in let generator k _ = get_next_page k in let rec generate_output_streams s = match s with | [] -> [] | (key,r)::t -> ( let new_stream = Stream.from (generator key) in (key, new_stream)::(generate_output_streams t) ) in generate_output_streams starts; end ;;