open Types let new_packet page data n contd conts isLast = { p_data = data ; p_granulepos = if isLast then page.raw.granulepos else None ; p_time = if isLast then page.time else None ; p_pageno = n ; p_serialno = page.raw.serialno ; p_continued = contd ; p_continues = conts ; p_page_seq = page.raw.sequenceno ; p_identity = page.identity ; p_bos = page.raw.bos ; p_eos = if isLast then page.raw.eos else false };; type ptp_context = {mutable cont_packet : bool};; (* pcont indicates whether the previous packet is continued at this packet *) let page_to_packets cont page = let sn = page.raw.serialno in if page.raw.bos then cont := (sn, {cont_packet = false})::!cont; let rec _ptp raw_data n = let pcont = List.assoc sn !cont in let oc = pcont.cont_packet in pcont.cont_packet <- false; match raw_data with | [h] when page.raw.last_packet_complete -> ( pcont.cont_packet <- false; [< 'new_packet page h n oc false true >] ) | [h] -> ( pcont.cont_packet <- true; [< 'new_packet page h n oc true true >] ) | h::[r] when not page.raw.last_packet_complete -> ( pcont.cont_packet <- true; [< 'new_packet page h n oc false true ; 'new_packet page r n false pcont.cont_packet false >] ) | h::t -> [< 'new_packet page h n oc false false; _ptp t (n+1) >] | [] -> [< >] in _ptp page.raw.raw_data 0;; let to_packetStream pstream = let cont = ref [] in let rec _tps pstream = match pstream with parser | [< 'page ; rest >] -> [< page_to_packets cont page ; _tps rest >] | [< >] -> [< >] in _tps pstream;; let create_fresh_page packet = let raw_page = { continued = packet.p_continued; bos = packet.p_bos; eos = packet.p_eos; last_packet_complete = not packet.p_continues; granulepos = if packet.p_continues then None else packet.p_granulepos; serialno = packet.p_serialno; sequenceno = packet.p_page_seq; checksum = (0, 0); packet_sizes = [String.length packet.p_data]; raw_data = [packet.p_data] } in let page = { raw = raw_page; time = if packet.p_continues then None else packet.p_time; identity = packet.p_identity } in page;; let add_packet_to_page page packet = { page with raw = { page.raw with eos = packet.p_eos; last_packet_complete = not packet.p_continues; granulepos = if packet.p_continues then page.raw.granulepos else packet.p_granulepos; packet_sizes = page.raw.packet_sizes @ [String.length packet.p_data]; raw_data = page.raw.raw_data @ [packet.p_data] }; time = if packet.p_continues then page.time else packet.p_time };; let flush_page page = let checksum = Page.generate_crc page.raw in [< '{ page with raw = {page.raw with checksum = checksum} } >];; let packet_to_page context packet = let _flush _ = let old_page = !context in let page = create_fresh_page packet in context := Some page; match old_page with | Some op -> [< flush_page op >] | None -> [< >] in match !context with | None -> ( let page = create_fresh_page packet in context := Some page; [< >] ) | Some page -> ( if not (page.raw.serialno = packet.p_serialno) then [< _flush () >] else ( if packet.p_page_seq = page.raw.sequenceno then ( context := Some (add_packet_to_page page packet); [< >] ) else ( [< _flush () >]; ) ) );; let packetStream_to_pageStream pstream = let context = ref None in let rec _ptp _ = match pstream with parser | [< 'packet >] -> [< packet_to_page context packet; _ptp () >] | [< >] -> (match !context with | Some lp -> [< flush_page lp >] | None -> [< >]) in _ptp ();; type reconstruct_context = { rc_tf : granulePos -> float option; mutable rc_cache : packet list; mutable rc_last_gp : granulePos; mutable rc_last_packet : string; rc_ngpf : string -> string -> granulePos -> granulePos; rc_pgpf : string -> string -> granulePos -> granulePos } ;; (* reconstruct packet given a previous gp. Don't store 0 GPs *) let packet_from_last_gp crec pack = let gp = crec.rc_last_gp in let new_gp = if pack.p_continued then gp else crec.rc_ngpf crec.rc_last_packet pack.p_data gp in if not (new_gp = Some (0,0,0,0)) then crec.rc_last_gp <- new_gp; if pack.p_continued then crec.rc_last_packet <- crec.rc_last_packet ^ pack.p_data else crec.rc_last_packet <- pack.p_data; [< '{ pack with p_granulepos = new_gp; p_time = crec.rc_tf new_gp } >] (* flush the non-timestamped packets from the context record now that we have a real granulepos *) let new_gp_and_flush crec pack = let rec _ngaf l p gp = match l with | h::t -> ( let last_gp = if pack.p_continues then gp else crec.rc_pgpf h.p_data p gp in [< _ngaf t h.p_data last_gp; '{ h with p_granulepos = last_gp; p_time = crec.rc_tf last_gp } >]) | [] -> [< >] in let l = crec.rc_cache in crec.rc_cache <- []; if not (pack.p_granulepos = Some (0,0,0,0)) then crec.rc_last_gp <- pack.p_granulepos; crec.rc_last_packet <- pack.p_data; [< _ngaf l pack.p_data pack.p_granulepos; 'pack >] let reconstruct_packet context pack = let sn = pack.p_serialno in let id = pack.p_identity in let gp = pack.p_granulepos in let crec = ( if pack.p_bos then ( context := (sn, {rc_tf=Granules.granulerate_function id pack.p_data; rc_cache=[]; rc_last_gp=None; rc_last_packet=""; rc_pgpf=Granules.last_gp_function id pack.p_data; rc_ngpf=Granules.next_gp_function id pack.p_data}):: (!context); snd (List.hd !context)) else List.assoc sn !context ) in match gp with | None -> (match crec.rc_last_gp with | None -> (crec.rc_cache <- pack::crec.rc_cache; [< >]) | Some _ -> packet_from_last_gp crec pack) | Some gp -> [< new_gp_and_flush crec pack >];; let reconstruct_timing pstream = let context = ref [] in let rec _rt s = match s with parser | [< 'pack ; rest >] -> [] | [< >] -> [< >] in _rt pstream;; type repair_context = { mutable rc_packet : packet option };; let new_rc _ = {rc_packet = None} let repair_splits pstream = let context = ref [] in let rec _rs s = match s with parser | [< 'pack ; rest >] -> ( if pack.p_bos then context := (pack.p_serialno, new_rc ())::!context; let crec = List.assoc pack.p_serialno !context in if pack.p_continues then ( ( match crec.rc_packet with | None -> crec.rc_packet <- Some pack | Some p -> crec.rc_packet <- Some {p with p_data = p.p_data ^ pack.p_data} ); [< _rs rest >] ) else ( let op = crec.rc_packet in crec.rc_packet <- None ; match op with | None -> [< 'pack ; _rs rest >] | Some p -> [< '{p with p_data = p.p_data ^ pack.p_data ; p_continues = false ; p_continued = false } ; _rs rest >] ) ) | [< >] -> [< >] in _rs pstream;; module PacketInput = struct exception DontSortPacketStreamsYet type k = serialNo type s = packet let eq = (=) let get_key p = p.p_serialno let is_first p = p.p_bos let is_last p = p.p_eos let printKey s = print_oogg32 s let lt a b = raise DontSortPacketStreamsYet end;; module PacketSort = StreamSort.StreamSort (PacketInput);; let sort = PacketSort.sort;;