Use the following code to help facilitate the on - demand loading of data from a System.IO.Stream.

public static class StreamExtensions
{
public static ITransactionResult /*System.IAsyncResult*/ BeginCopyTo(this System.IO.Stream source, System.IO.Stream dest, int bufferSize, System.Action<long, byte[], int, int> writeFunction)
{
return new StreamCopyTransaction(source, dest, bufferSize, writeFunction);
}

#region DownloadReference

internal sealed class DownloadReference : CommonDisposable
{
//Should be done at CommonDisposable level?
internal static System.TimeSpan MaxLifeTime = System.TimeSpan.FromHours(1 /* + GetObjectLifeTime()*/);

public readonly System.DateTimeOffset Created = System.DateTimeOffset.UtcNow;

public System.Uri Location { get; internal set; }

public System.IDisposable Request { get; internal set; }

public System.IDisposable Response { get; internal set; }

public System.IO.Stream Result { get; internal set; }

internal long MaximumLength;

public override void Dispose()
{
ShouldDispose = false == Result.CanRead && (System.DateTimeOffset.UtcNow - Created) > MaxLifeTime;

if (false == ShouldDispose) return;

if (Location != null) Location = null;

if (Response != null) { Response.Dispose(); Response = null; }

if (Request != null) { Request.Dispose(); Request = null; }

if (Result != null) { Result.Dispose(); Result = null; }

base.Dispose();
}

internal DownloadReference(System.Uri location)
: base(false)
{
if ((Location = location) == null) throw new System.ArgumentNullException("location");

//var request = (System.Net.HttpWebRequest)System.Net.HttpWebRequest.Create(Location);

//Request = (System.IDisposable)request;

//var response = request.GetResponse();

//Response = response;

//Result = response.GetResponseStream();
}

~DownloadReference() { Dispose(); }

internal class DisposableHttpWebRequest : CommonDisposable
{
internal System.Net.HttpWebRequest Request;

internal System.IO.Stream ResponseStream;

public readonly System.DateTimeOffset Created = System.DateTimeOffset.UtcNow;

public DisposableHttpWebRequest(System.Net.HttpWebRequest request)
: base(false)
{
Request = request;
}

public override void Dispose()
{
ShouldDispose = false == ResponseStream.CanRead && (System.DateTimeOffset.UtcNow - Created) > MaxLifeTime;

if (false == ShouldDispose) return;

base.Dispose();

Request.Abort();

Request = null;

ResponseStream.Dispose();

ResponseStream = null;
}
}

public static DownloadReference HttpWebRequestDownload(System.Uri location)
{
if (location == null || false == location.OriginalString.StartsWith(System.Uri.UriSchemeHttp)) return null;

var request = new DisposableHttpWebRequest((System.Net.HttpWebRequest)System.Net.HttpWebRequest.Create(location));

var response = request.Request.GetResponse();

return new DownloadReference(location)
{
Request = request,
Response = response,
Result = request.ResponseStream = response.GetResponseStream(),
MaximumLength = response.ContentLength
};
}

public static DownloadReference FileDownload(System.Uri location)
{
return WebClientDownload(location);
}

public static DownloadReference WebClientDownload(System.Uri location)
{
if (location == null) throw new System.ArgumentNullException("location");

var webClient = new System.Net.WebClient();

var d = new DownloadReference(location)
{
Request = webClient,
Result = webClient.OpenRead(location)
};

if (webClient.ResponseHeaders != null)
{
string contentLength = webClient.ResponseHeaders["Content-Length"];

if (false == string.IsNullOrWhiteSpace(contentLength)) long.TryParse(contentLength, out d.MaximumLength);
}

return d;
}
}

#endregion

#region Download \ TryDownload

public static System.IO.Stream HttpWebRequestDownload(System.Uri location)
{
DownloadReference d = DownloadReference.HttpWebRequestDownload(location);

return d.Result;
}

public static bool TryHttpWebRequestDownload(System.Uri location, out System.IO.Stream result)
{
result = null;

try
{
result = HttpWebRequestDownload(location);

return result != null;
}
catch (System.Exception ex)
{
Common.Extensions.Exception.ExceptionExtensions.TryRaiseTaggedException(result, ex.Message, ex);

return false;
}
}

public static System.IO.Stream FileDownload(System.Uri location)
{
DownloadReference d = DownloadReference.FileDownload(location);

return d.Result;
}

public static bool TryFileDownload(System.Uri location, out System.IO.Stream result)
{
result = null;

try
{
result = FileDownload(location);

return result != null;
}
catch (System.Exception ex)
{
Common.Extensions.Exception.ExceptionExtensions.TryRaiseTaggedException(result, ex.Message, ex);

return false;
}
}

public static System.IO.Stream WebClientDownload(System.Uri location)
{
DownloadReference d = DownloadReference.WebClientDownload(location);

return d.Result;
}

public static bool TryWebClientDownload(System.Uri location, out System.IO.Stream result)
{
result = null;

try
{
result = WebClientDownload(location);

return result != null;
}
catch (System.Exception ex)
{
Common.Extensions.Exception.ExceptionExtensions.TryRaiseTaggedException(result, ex.Message, ex);

return false;
}
}

#endregion

//Creative...

public delegate void EventHandlerEx<in T>(object sender, T t);

//Interface

public interface ITransactionResult : System.IAsyncResult, IDisposed
{
void CancelTransaction();

bool IsTransactionCanceled { get; }

bool IsTransactionDone { get; }

event EventHandlerEx<ITransactionResult> TransactionRead;

event EventHandlerEx<ITransactionResult> TransactionWrite;

event EventHandlerEx<ITransactionResult> TransactionCompleted;

event EventHandlerEx<ITransactionResult> TransactionCancelled;
}

//public Abstraction

public abstract class TransactionBase : Common.BaseDisposable, ITransactionResult
{
#region Statics

static void HandleCompleted(object sender, ITransactionResult e)
{
e = e ?? sender as TransactionBase;

if (e == null) return;

if (e is TransactionBase) ((TransactionBase)e).IsTransactionDone = true;

e.Dispose();
}

static void HandleRead(object sender, ITransactionResult e)
{
//if (e == null && false == sct is TransactionBase) return;
}

static void HandleWrite(object sender, ITransactionResult e)
{
e = e ?? sender as TransactionBase;

if (e == null) return;

if (e is TransactionBase)
{
TransactionBase tb = ((TransactionBase)e);

tb.TransactionOffset += tb.LastTransactionAmount;

tb = null;
}
}

static void HandleCancelled(object sender, ITransactionResult e)
{
e = e ?? sender as TransactionBase;

if (e == null) return;

if (e is TransactionBase)
{
TransactionBase tb = ((TransactionBase)e);

tb.TransactionCompleted(sender, e);

tb = null;
}
}

internal protected static void RaiseCompleted(TransactionBase tb, object sender = null) { tb.TransactionCompleted(sender, tb); }

internal protected static void RaiseWrite(TransactionBase tb, object sender = null) { tb.TransactionWrite(sender, tb); }

internal protected static void RaiseRead(TransactionBase tb, object sender = null) { tb.TransactionRead(sender, tb); }

internal protected static void RaiseCancelled(TransactionBase tb, object sender = null) { tb.TransactionCancelled(sender, tb); }

#endregion

#region Fields

public readonly System.DateTimeOffset Created = System.DateTimeOffset.UtcNow;

protected long TransactionOffset;

protected readonly System.Threading.CancellationTokenSource CancelTokenSource;

protected readonly System.Threading.CancellationToken CancelToken;

#endregion

#region Properties

#endregion

#region Virtual Properties

public virtual bool IsTransactionDone { get; protected set; }

public virtual int LastTransactionAmount { get; protected set; }

public virtual long TotalTransactionBytesWritten { get { return TransactionOffset; } }

public virtual System.IAsyncResult AsyncTransactionResult { get; internal set; }

public System.IAsyncResult AsyncResult { get; internal set; }

public virtual object AsyncState { get { return AsyncResult.AsyncState; } }

public virtual System.Threading.WaitHandle AsyncWaitHandle { get { return AsyncResult.AsyncWaitHandle; } }

public virtual bool CompletedSynchronously { get { return AsyncResult.CompletedSynchronously; } }

public virtual bool IsCompleted
{
get { return IsTransactionCanceled || IsTransactionDone || AsyncTransactionResult != null && AsyncTransactionResult.IsCompleted; }
}

#endregion

#region Constructor / Destructor

public TransactionBase(bool shouldDispose = false)
:base(shouldDispose)
{
CancelTokenSource = new System.Threading.CancellationTokenSource();

CancelToken = CancelTokenSource.Token;

TransactionCompleted += TransactionBase.HandleCompleted;

TransactionCancelled += TransactionBase.HandleCancelled;

TransactionRead += TransactionBase.HandleRead;

TransactionWrite += TransactionBase.HandleWrite;
}

~TransactionBase() { Dispose(); }

#endregion

#region Overrides

protected override void Dispose(bool disposing)
{
ShouldDispose = disposing || (System.DateTimeOffset.UtcNow - Created) > DownloadReference.MaxLifeTime;

if (IsDisposed || false == ShouldDispose) return;

Cancel();

if (AsyncResult != null) AsyncResult = null;

if (TransactionRead != null)
{
TransactionRead -= TransactionBase.HandleRead;

TransactionRead = null;
}

if (TransactionWrite != null)
{
TransactionWrite -= TransactionBase.HandleWrite;

TransactionWrite = null;
}

if (TransactionCompleted != null)
{
TransactionCompleted -= TransactionBase.HandleCompleted;

TransactionCompleted = null;
}

if (TransactionCancelled != null)
{
TransactionCancelled -= TransactionBase.HandleCancelled;

TransactionCancelled = null;
}
}

#endregion

#region Virtual Methods

public virtual void Cancel()
{
if (false == IsCompleted && CancelTokenSource != null)
{
CancelTokenSource.Cancel();

if (TransactionCancelled != null) TransactionCancelled(null, this);
}
}

public virtual void CancelTransaction()
{
Cancel();
}

public virtual bool IsTransactionCanceled
{
get { return CancelTokenSource.IsCancellationRequested || CancelToken.IsCancellationRequested; }
}

#endregion

#region Methods

#endregion

#region Events

public event EventHandlerEx<ITransactionResult> TransactionRead;

public event EventHandlerEx<ITransactionResult> TransactionWrite;

public event EventHandlerEx<ITransactionResult> TransactionCompleted;

public event EventHandlerEx<ITransactionResult> TransactionCancelled;

#endregion
}

//sealed Implementation

public sealed class StreamCopyTransaction : TransactionBase
{

public System.IO.Stream Source { get; internal set; }

public readonly bool DisposeSource;

public System.IO.Stream Destination { get; internal set; }

public readonly bool DisposeDestination;

public Common.MemorySegment Memory { get; internal set; }

public System.AsyncCallback ReadFunction { get; internal set; }

public System.Action<long, byte[], int, int> WriteFunction { get; set; }

public StreamCopyTransaction(System.IO.Stream source, System.IO.Stream dest, int bufferSize, System.Action<long, byte[], int, int> writeFunction)
{
Source = source;

Destination = dest;

Memory = new MemorySegment(bufferSize);

ReadFunction = CopyLogic;

WriteFunction = writeFunction;

AsyncResult = source.BeginRead(Memory.Array, 0, bufferSize, ReadFunction, this);
}

public StreamCopyTransaction(System.IO.Stream source, bool disposeSource, System.IO.Stream dest, bool disposeDest, int bufferSize, System.Action<long, byte[], int, int> writeFunction)
:this(source, dest, bufferSize, writeFunction)
{
DisposeSource = disposeSource;

DisposeDestination = disposeDest;
}

void CopyLogic(System.IAsyncResult iar)
{
try { LastTransactionAmount = Source.EndRead(iar); }
catch { LastTransactionAmount = 0; }

switch (LastTransactionAmount)
{
case 0:
{
TransactionBase.RaiseCompleted(this);

return;
}
default:
{
TransactionBase.RaiseRead(this);

WriteFunction(TransactionOffset, Memory.Array, 0, LastTransactionAmount);

TransactionBase.RaiseWrite(this);

if(false == IsTransactionCanceled && false == IsDisposed) AsyncResult = Source.BeginRead(Memory.Array, 0, Memory.Count, ReadFunction, this);

return;
}
}
}

~StreamCopyTransaction() { Dispose(ShouldDispose = true); }
}
}

 

Cache Container.Node instances to reduce context switching and IO with a structure like so:

 

public struct NodeReference
{
readonly IMediaContainer Master;

//Master.ReadIdentifier..

public NodeReference(IMediaContainer master)
: this()
{
if (master == null) throw new System.ArgumentNullException("Master");

Master = master;
}

public NodeReference(IMediaContainer master, long dataOffset, long totalSize)
: this(master)
{
DataOffset = dataOffset;

TotalSize = totalSize;
}

public NodeReference(Node n)
: this(n.Master, n.DataOffset, n.TotalSize)
{

}

internal long DataOffset;

internal long TotalSize;

public Node ToNode()
{
return new Node(Master, null /*Master.ReadIdentifier*/, 8/*Master.LengthSize*/, DataOffset, TotalSize, TotalSize > 0);
}
}

 

Augment the MediaFileStream with logic to support the above, thus allowing a variety of input sources and protocols:

 

#region Constructor / Destructor

~MediaFileStream() { m_Disposed = true; Close(); }

/// <summary>
/// Creates a new FileStream from the given
/// </summary>
/// <param name="filename"></param>
/// <param name="access"></param>
public MediaFileStream(string filename, System.IO.FileAccess access = System.IO.FileAccess.Read) : this(new Uri(filename), access) { }

/// <summary>
/// Creates a new FileStream from the given
/// </summary>
/// <param name="location"></param>
/// <param name="access"></param>
public MediaFileStream(Uri location, System.IO.FileAccess access = System.IO.FileAccess.Read)
: base(location.LocalPath, System.IO.FileMode.Open, access, System.IO.FileShare.ReadWrite)
{
m_Source = location;

FileInfo = new System.IO.FileInfo(m_Source.LocalPath);

m_Position = base.Position;

m_Length = base.Length;
}

/// <summary>
/// Creates a new FileStream from the given.
/// </summary>
/// <param name="stream"></param>
/// <param name="access"></param>
public MediaFileStream(System.IO.FileStream stream, System.IO.FileAccess access = System.IO.FileAccess.Read, Uri uri = null)
: base(stream.SafeFileHandle, access)
{
try
{
m_Source = uri ?? new Uri(stream.Name);

if (m_Source.IsFile)
{
FileInfo = new System.IO.FileInfo(m_Source.LocalPath);

Position = stream.Position;

m_Length = stream.Length;
}
else
{
FileInfo = new System.IO.FileInfo(GetCurrentWorkingDirectory().Directory.ToString() + m_Source.Segments.Last());
}
}
catch { throw; }
}

#endregion

#region Buffering implementation..

public bool Buffering { get; protected set; }

//event BufferingComplete

bool m_ShouldDelete;

public MediaFileStream(Uri source, System.IO.Stream stream, DateTime? quantifier = null, int size = 8192, bool shouldDelete = true)
: base(GetCurrentWorkingDirectory().Directory.ToString() + (quantifier.HasValue ? quantifier.Value.ToFileTimeUtc() : DateTime.UtcNow.ToFileTimeUtc()).ToString(), System.IO.FileMode.CreateNew, System.IO.FileAccess.ReadWrite, System.IO.FileShare.ReadWrite)
{
m_Source = source;

m_ShouldDelete = shouldDelete;

FileInfo = new System.IO.FileInfo(Name);

Buffering = true;

Common.Extensions.Stream.StreamExtensions.ITransactionResult result = Common.Extensions.Stream.StreamExtensions.BeginCopyTo(stream, this, size, WriteAt);

//Done by WriteAt
////result.TransactionWrite += (s, e) =>
////{
//// m_Position += e.LastRead;

//// m_Length = e.Total;

////};

result.TransactionCompleted += IStreamCopyTransactionResultCompleted;

while (Root == null) result.AsyncWaitHandle.WaitOne(0);
}

void IStreamCopyTransactionResultCompleted(object sender, Common.Extensions.Stream.StreamExtensions.ITransactionResult t)
{
Buffering = false;

if (t != null)
{
t.TransactionCompleted -= IStreamCopyTransactionResultCompleted;

t.Dispose();
}
}

#endregion

#region Caching implementation...

bool m_ShouldCacheNodes;

public bool CachingNodes { get { return m_ShouldCacheNodes; } }

SortedDictionary<long, NodeReference> m_NodeCache = new SortedDictionary<long, NodeReference>();

protected virtual void CacheNode(Node n, bool checkSelf = true)
{
if (checkSelf && n.Master != this) throw new InvalidOperationException("n.Master is not this => [MediaFileStream] instance.");

m_NodeCache.Add(n.Offset, new NodeReference(n));
}

protected virtual bool TryCacheNode(Node n, bool checkSelf = true)
{
try { CacheNode(n, checkSelf); return true; }
catch { return false; }
}

public IEnumerable<NodeReference> CachedNodes { get { return m_NodeCache.Values; } }

public IEnumerable<long> CachedOffsets { get { return m_NodeCache.Keys; } }

#endregion

 

Last edited Aug 1, 2015 at 1:51 AM by juliusfriedman, version 1