Giter Club home page Giter Club logo

dynamicdata's Introduction

Build Coverage Reliability Rating Duplicated Lines (%) Vulnerabilities Security Rating NuGet Stats Downloads

Dynamic Data

Dynamic Data is a portable class library which brings the power of Reactive Extensions (Rx) to collections.

Rx is extremely powerful but out of the box provides nothing to assist with managing collections. In most applications there is a need to update the collections dynamically. Typically a collection is loaded and after the initial load, asynchronous updates are received. The original collection will need to reflect these changes. In simple scenarios the code is simple. However, typical applications are much more complicated and may apply a filter, transform the original dto and apply a sort. Even with these simple every day operations the complexity of the code is quickly magnified. Dynamic data has been developed to remove the tedious code of dynamically maintaining collections. It has grown to become functionally very rich with at least 60 collection based operations which amongst other things enable filtering, sorting, grouping, joining different sources, transforms, binding, pagination, data virtualisation, expiration, disposal management plus more.

The concept behind using dynamic data is you maintain a data source (either SourceCache<TObject, TKey> or SourceList<TObject>), then chain together various combinations of operators to declaratively manipulate and shape the data without the need to directly manage any collection.

As an example the following code will filter trades to select only live trades, creates a proxy for each live trade, and finally orders the results by most recent first. The resulting trade proxies are bound on the dispatcher thread to an observable collection. Also since the proxy is disposable DisposeMany() will ensure the proxy is disposed when no longer used.

ReadOnlyObservableCollection<TradeProxy> list;

var myTradeCache = new SourceCache<Trade, long>(trade => trade.Id);
var myOperation = myTradeCache.Connect() 
		.Filter(trade=>trade.Status == TradeStatus.Live) 
		.Transform(trade => new TradeProxy(trade))
		.Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
		.ObserveOnDispatcher()
		.Bind(out list) 
		.DisposeMany()
		.Subscribe()

The magic is that as myTradeCache is maintained the target observable collection looks after itself.

This is a simple example to show how using Dynamic Data's collections and operators make in-memory data management extremely easy and can reduce the size and complexity of your code base by abstracting complicated and often repetitive operations.

Sample Projects

Get in touch

If you have any questions, want to get involved or would simply like to keep abreast of developments, you are welcome to join the slack community Reactive UI Slack. I am also available @RolandPheasant There is a blog at https://dynamic-data.org/ but alas it is hopelessly out of date.

Table of Contents

Create Dynamic Data Collections

The Observable List

Create an observable list like this:

var myInts = new SourceList<int>();

The observable list provides the direct edit methods you would expect. For example:

myInts.AddRange(Enumerable.Range(0, 10000)); 
myInts.Add(99999); 
myInts.Remove(99999);

The AddRange, Add and Remove methods above will each produce a distinct change notification. In order to increase efficiency when making multiple amendments, the list provides a means of batch editing. This is achieved using the .Edit method which ensures only a single change notification is produced.

myInts.Edit(innerList =>
{
   innerList.Clear();
   innerList.AddRange(Enumerable.Range(0, 10000));
});

If myInts is to be exposed publicly it can be made read only using .AsObservableList

IObservableList<int> readonlyInts = myInts.AsObservableList();

which hides the edit methods.

The list's changes can be observed by calling myInts.Connect() like this:

IObservable<IChangeSet<int>> myIntsObservable = myInts.Connect();

This creates an observable change set for which there are dozens of operators. The changes are transmitted as an Rx observable, so they are fluent and composable.

The Observable Cache

Create an observable cache like this:

var myCache = new SourceCache<TObject,TKey>(t => key);

There are direct edit methods, for example

myCache.Clear();
myCache.AddOrUpdate(myItems);

The Clear and AddOrUpdate methods above will each produce a distinct change notification. In order to increase efficiency when making multiple amendments, the cache provides a means of batch editing. This is achieved using the .Edit method which ensures only a single change notification is produced.

myCache.Edit(innerCache =>
			  {
			      innerCache.Clear();
			      innerCache.AddOrUpdate(myItems);
			  });

If myCache is to be exposed publicly it can be made read only using .AsObservableCache

IObservableCache<TObject,TKey> readonlyCache = myCache.AsObservableCache();

which hides the edit methods.

The cache is observed by calling myCache.Connect() like this:

IObservable<IChangeSet<TObject,TKey>> myCacheObservable = myCache.Connect();

This creates an observable change set for which there are dozens of operators. The changes are transmitted as an Rx observable, so they are fluent and composable.

Creating Observable Change Sets

As stated in the introduction of this document, Dynamic Data is based on the concept of creating and manipulating observable change sets.

The primary method of creating observable change sets is to connect to instances of ISourceCache<T,K> and ISourceList<T>. There are alternative methods to produce observables change sets however, depending on the data source.

Connect to a Cache or List

Calling Connect() on a ISourceList<T> or ISourceCache<T,K> will produce an observable change set.

var myObservableChangeSet = myDynamicDataSource.Connect();

Create an Observable Change Set from an Rx Observable

Given either of the following observables:

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;

an observable change set can be created like by calling .ToObservableChangeSet like this:

var myObservableChangeSet = myObservable.ToObservableChangeSet(t=> t.key);

Create an Observable Change Set from an Rx Observable with an Expiring Cache

The problem with the example above is that the internal backing cache of the observable change set will grow in size forever. To counter this behavior, there are overloads of .ToObservableChangeSet where a size limitation or expiry time can be specified for the internal cache.

To create a time expiring cache, call .ToObservableChangeSet and specify the expiry time using the expireAfter argument:

var myConnection = myObservable.ToObservableChangeSet(t=> t.key, expireAfter: item => TimeSpan.FromHours(1));

To create a size limited cache, call .ToObservableChangeSet and specify the size limit using the limitSizeTo argument:

var myConnection = myObservable.ToObservableChangeSet(t=> t.key, limitSizeTo:10000);

There is also an overload to specify expiration by both time and size.

Create an Observable Change Set from an Observable Collection

var myObservableCollection = new ObservableCollection<T>();

To create a cache based observable change set, call .ToObservableChangeSet and specify a key selector for the backing cache

var myConnection = myObservableCollection.ToObservableChangeSet(t => t.Key);

or to create a list based observable change set call .ToObservableChangeSet with no arguments

var myConnection = myObservableCollection.ToObservableChangeSet();

This method is only recommended for simple queries which act only on the UI thread as ObservableCollection is not thread safe.

Create an Observable Change Set from an Binding List

var myBindingList = new BindingList<T>();

To create a cache based observable change set, call .ToObservableChangeSet and specify a key selector for the backing cache

var myConnection = myBindingList.ToObservableChangeSet(t => t.Key);

or to create a list based observable change set call .ToObservableChangeSet with no arguments

var myConnection = myBindingList.ToObservableChangeSet();

This method is only recommended for simple queries which act only on the UI thread as ObservableCollection is not thread safe.

Using the ObservableChangeSet static class

There is also another way to create observable change sets, and that is to use the ObservableChangeSet static class. This class is a facsimile of the Rx.Net Observable static class and provides an almost identical API.

An observable list can be created as follows:

  var myObservableList = ObservableChangeSet.Create<int>(observableList =>
  {
	  //some code to load data and subscribe
      var loader= myService.LoadMyDataObservable().Subscribe(observableList.Add);
      var subscriber = myService.GetMySubscriptionsObservable().Subscribe(observableList.Add);
      //dispose of resources
      return new CompositeDisposable(loader,subscriber );
  });

and creating a cache is almost identical except a key has to be specified

  var myObservableCache = ObservableChangeSet.Create<Trade, int>(observableCache =>
  {
	  //code omitted
  }, trade = > trade.Id);

There are several overloads ObservableChangeSet.Create which match the overloads which Observable.Create provides.

Consuming Observable Change Sets

The examples below illustrate the kind of things you can achieve after creating an observable change set. Now you can create an observable cache or an observable list, here are a few quick fire examples to illustrate the diverse range of things you can do. In all of these examples the resulting sequences always exactly reflect the items is the cache i.e. adds, updates and removes are always propagated.

Create a Derived List or Cache

This example shows how you can create derived collections from an observable change set. It applies a filter to a collection, and then creates a new observable collection that only contains items from the original collection that pass the filter. This pattern is incredibly useful when you want to make modifications to an existing collection and then expose the modified collection to consumers.

Even though the code in this example is very simple, this is one of the most powerful aspects of Dynamic Data.

Given a SourceList

var myList = new SourceList<People>();

You can apply operators, in this case the Filter() operator, and then create a new observable list with AsObservableList()

var oldPeople = myList.Connect().Filter(person => person.Age > 65).AsObservableList();

The resulting observable list, oldPeople, will only contain people who are older than 65.

The same pattern can be used with SourceCache by using .AsObservableCache() to create derived caches.

As an alternative to .Bind(out collection) you can use .BindToObservableList(out observableList) for both SourceList & SourceCache. This is useful for getting derived read-only lists from sources that use .AutoRefresh(), since collections do not support refresh notifications.

Filtering

Filter the observable change set by using the Filter operator

var myPeople = new SourceList<People>();
var myPeopleObservable = myPeople.Connect();

var myFilteredObservable = myPeopleObservable.Filter(person => person.Age > 50); 

or to filter a change set dynamically

IObservable<Func<Person,bool>> observablePredicate=...;
var myFilteredObservable = myPeopleObservable.Filter(observablePredicate); 

Sorting

Sort the observable change set by using the Sort operator

var myPeople = new SourceList<People>();
var myPeopleObservable = myPeople.Connect();
var mySortedObservable = myPeopleObservable.Sort(SortExpressionComparer.Ascending(p => p.Age)); 

or to dynamically change sorting

IObservable<IComparer<Person>> observableComparer=...;
var mySortedObservable = myPeopleObservable.Sort(observableComparer);

For more information on sorting see wiki

Grouping

The GroupOn operator pre-caches the specified groups according to the group selector.

var myOperation = personChangeSet.GroupOn(person => person.Status)

The value of the inner group is represented by an observable list for each matched group. When values matching the inner grouping are modified, it is the inner group which produces the changes. You can also use GroupWithImmutableState which will produce a grouping who's inner items are a fixed size array.

Transformation

The Transform operator allows you to map objects from the observable change set to another object

var myPeople = new SourceList<People>();
var myPeopleObservable = myPeople.Connect();
var myTransformedObservable = myPeopleObservable.Transform(person => new PersonProxy(person));

The TransformToTree operator allows you to create a fully formed reactive tree (only available for observable cache)

var myPeople = new SourceCache<Person, string>(p => p.Name);
var myTransformedObservable = myPeople.Connect().TransformToTree(person => person.BossId);

Flatten a child enumerable

var myOperation = personChangeSet.TransformMany(person => person.Children) 

Aggregation

The Count, Max, Min, Avg, and StdDev operators allow you to perform aggregate functions on observable change sets

var myPeople = new SourceList<People>();
var myPeopleObservable = myPeople.Connect();

var countObservable = 	 myPeopleObservable.Count();
var maxObservable = 	 myPeopleObservable.Max(p => p.Age);
var minObservable = 	 myPeopleObservable.Min(p => p.Age);
var stdDevObservable =   myPeopleObservable.StdDev(p => p.Age);
var avgObservable = 	 myPeopleObservable.Avg(p => p.Age);

More aggregating operators will be added soon.

Logical Operators

The And, Or, Xor and Except operators allow you to perform logical operations on observable change sets

var peopleA = new SourceCache<Person,string>(p => p.Name);
var peopleB = new SourceCache<Person,string>(p => p.Name);

var observableA = peopleA.Connect();
var observableB = peopleB.Connect();

var inBoth = observableA.And(observableB);
var inEither= observableA.Or(observableB);
var inOnlyOne= observableA.Xor(observableB);
var inAandNotinB = observableA.Except(observableB);

A recent and very powerful feature is dynamic logical operators. From version 4.6 onwards you can dynamically include and exclude collections from the resulting list.

var list1 = new SourceList<int>();
var list2 = new SourceList<int>();
var list3  = new SourceList<int>();
	
var combined = new SourceList<ISourceList<int>>();

//child lists can be added or removed any time
combined.Add(list1);
combined.Add(list2);
combined.Add(list3);

//The operators look after themselves 
var inAll = combined.And();
var inAny = combined.Or();
var inOnlyOne= combined.Xor();
var inFirstAndNotAnyOther = combined.Except();

For more information on grouping see wiki

Disposal

The DisposeMany operator ensures that objects are disposed when removed from an observable stream

var myPeople = new SourceList<People>();
var myPeopleObservable = myPeople.Connect();
var myTransformedObservable = myPeopleObservable.Transform(person => new DisposablePersonProxy(person))
                                                .DisposeMany();

The DisposeMany operator is typically used when a transform function creates disposable objects.

Distinct Values

The DistinctValues operator will select distinct values from the underlying collection

var myPeople = new SourceList<People>();
var myPeopleObservable = myPeople.Connect();
var myDistinctObservable = myPeopleObservable.DistinctValues(person => person.Age);

Virtualisation

Virtualise data to restrict by index and segment size

IObservable<IVirtualRequest> request; //request stream
var virtualisedStream = someDynamicDataSource.Virtualise(request)

Virtualise data to restrict by index and page size

IObservable<IPageRequest> request; //request stream
var pagedStream = someDynamicDataSource.Page(request)

In either of the above, the result is re-evaluated when the request stream changes

Top is an overload of Virtualise() and will return items matching the first 'n' items.

var topStream = someDynamicDataSource.Top(10)

Observing Properties of Objects in a Collection

If the collection is made up of objects that implement INotifyPropertyChanged then the following operators are available

The WhenValueChanged operator returns an observable of the value of the specified property when it has changed

var ageChanged = peopleDataSource.Connect().WhenValueChanged(p => p.Age)

The WhenPropertyChanged operator returns an observable made up of the value of the specified property as well as it's parent object when the specified property has changed

var ageChanged = peopleDataSource.Connect().WhenPropertyChanged(p => p.Age)

The WhenAnyPropertyChanged operator returns an observable of objects when any of their properties have changed

var personChanged = peopleDataSource.Connect().WhenAnyPropertyChanged()

Observing item changes

Binding is a very small part of Dynamic Data. The above notify property changed overloads are just an example when binding. If you have a domain object which has children observables you can use MergeMany() which subscribes to and unsubscribes from items according to collection changes.

var myoperation = somedynamicdatasource.Connect() 
			.MergeMany(trade => trade.SomeObservable());

This wires and unwires SomeObservable as the collection changes.

Observable list vs observable cache

I get asked about the differences between these a lot and the answer is really simple. If you have a unique id, you should use an observable cache as it is dictionary based which will ensure no duplicates can be added and it notifies on adds, updates and removes, whereas list allows duplicates and only has no concept of an update.

There is another difference. The cache side of dynamic data is much more mature and has a wider range of operators. Having more operators is mainly because I found it easier to achieve good all round performance with the key based operators and do not want to add anything to Dynamic Data which inherently has poor performance.

History of Dynamic Data

Even before Rx existed I had implemented a similar concept using old fashioned events but the code was very ugly and my implementation full of race conditions so it never existed outside of my own private sphere. My second attempt was a similar implementation to the first but using Rx when it first came out. This also failed as my understanding of Rx was flawed and limited and my design forced consumers to implement interfaces. Then finally I got my design head on and in 2011-ish I started writing what has become dynamic data. No inheritance, no interfaces, just the ability to plug in and use it as you please. All along I meant to open source it but having so utterly failed on my first 2 attempts I decided to wait until the exact design had settled down. The wait lasted longer than I expected and ended up taking over 2 years but the benefit is it has been trialled for 2 years on a very busy high volume low latency trading system which has seriously complicated data management. And what's more that system has gathered a load of attention for how slick and cool and reliable it is both from the user and IT point of view. So I present this library with the confidence of it being tried, tested, optimised and mature. I hope it can make your life easier like it has done for me.

Want to know more?

I could go on endlessly but this is not the place for full documentation. I promise this will come but for now I suggest downloading my WPF sample app (links at top of document) as I intend it to be a 'living document' and I promise it will be continually maintained.

Also, if you follow me on Twitter you will find out when new samples or blog posts have been updated.

Additionally, if you have read up to here and not pressed star then why not? Ha. A star may make me be more responsive to any requests or queries.

dynamicdata's People

Contributors

andersstorhaug avatar chrispulman avatar chuuddo avatar dependabot-preview[bot] avatar dependabot[bot] avatar dwcullop avatar elijahreva avatar glennawatson avatar hinidu avatar hutterm avatar jakenveina avatar jasonwurzel avatar kmgallahan avatar liklainy avatar longtomjr avatar ltrzesniewski avatar mendelmonteiro avatar modplug avatar noggog avatar onliner10 avatar owoudenberg avatar oysteinkrog avatar peter-b- avatar renovate[bot] avatar robsonj avatar rolandpheasant avatar ryanholden8 avatar stevosaurus avatar tomasfil avatar wouterdek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dynamicdata's Issues

issue sorting

with the following cache connect

        _pageController = new PageController();
        _filterController = new FilterController<MyVm>(myobject => myobject.Id != "");
        _sortController = new SortController<MyVm>(SortExpressionComparer<MyVm>.Descending(t => t.Id));

        var mySubscription = MyCache.Connect()
            .Filter(_filterController)
            .Sort(_sortController)
            .Page(_pageController)
            .ObserveOnDispatcher() 
            .Bind(Items)
            .Subscribe(); 

calling the following has no effect

_sortController.Change(SortExpressionComparer.Ascending(t => t.Id));

if i remove the page controller OR if i create a new page request like so
_pageController.Change(new PageRequest(1, 2));

then sorting works again. it looks like if you use a page controller without doing any paging the sorting just doesnt work.

Edit: It also appears that if you make the page the same size or greater than the number of elements, sorting doesnt work.

Edit: Add Some items to the cache
MyCache.AddOrUpdate(new MyVm("1"));
MyCache.AddOrUpdate(new MyVm("2"));
MyCache.AddOrUpdate(new MyVm("3"));
MyCache.AddOrUpdate(new MyVm("4"));

        _pageController.Change(new PageRequest(1, 3));
        _sortController.Change(SortExpressionComparer<MyVm>.Ascending(t => t.Id));

throws ArgumentOutOfRangeException. however a page size of 1 or 2 works fine.

Index out of range exception

so DateFavorited is a nullable DateTime.

In some combinations, such as when both TestVm's have a date set, and you remove one from the ItemCache, you get an index out of range error.

if you comment out the first item's date and remove the second one from the ItemCache, there is no error.

It also only happens if you are sorting on the DateFavorited field.

once this error occurs, changes to the itemcache stop refreshing the UI properly, so it's pretty nasty.

using System;
using System.Reactive.Linq;
using System.Windows;
using DynamicData.Binding;
using DynamicData;
using DynamicData.Controllers;
using DynamicData.Operators;

namespace WpfApplication3
{
    public partial class MainWindow : Window
    {
        public ObservableCollectionExtended<TestVm> Items { get; set; }
        public SourceCache<TestVm, int> ItemCache;
        public TestVm SelectedItem { get; set; }
        public PageController PageController;

        public MainWindow()
        {
            InitializeComponent();

            PageController = new PageController();
            var sortController = new SortController<TestVm>(SortExpressionComparer<TestVm>.Ascending(t => t.DateFavorited ?? DateTime.MinValue));
            var filterController = new FilterController<TestVm>(myVm => myVm.Id != 0);
            Items = new ObservableCollectionExtended<TestVm>();
            ItemCache = new SourceCache<TestVm, int>(myVm => myVm.Id);

            var item1 = new TestVm(1) { DateFavorited = DateTime.Now };
            var item2 = new TestVm(2) { DateFavorited = DateTime.Now };

            ItemCache.AddOrUpdate(item1);
            ItemCache.AddOrUpdate(item2);

            ItemCache.Connect()
                            .Filter(filterController)
                            .Sort(sortController).Do(OnNext)
                            .Page(PageController).Do(OnNext) //error doesnt occur with paging disabled
                            .ObserveOnDispatcher()
                            .Bind(Items)
                            .Subscribe();

            PageController.Change(new PageRequest(1, 100));

            DataContext = this;
        }

        private void OnNext(IChangeSet<TestVm, int> changeSet)
        {

        }

        private void BtnRemove_OnClick(object sender, RoutedEventArgs e)
        {
            if (SelectedItem != null)
            {
                SelectedItem.DateFavorited = null;
                ItemCache.Remove(SelectedItem); //ERROR!
            }
        }
    }

    public class TestVm
    {
        public int Id { get; set; }
        public DateTime? DateFavorited { get; set; }

        public TestVm(int id)
        {
            Id = id;
        }
    }
}

<Window x:Class="WpfApplication3.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        Title="MainWindow" Height="350" Width="525">
    <Grid>
        <Grid.ColumnDefinitions>
            <ColumnDefinition />
            <ColumnDefinition />
        </Grid.ColumnDefinitions>

        <ListBox SelectedItem="{Binding SelectedItem}" ItemsSource="{Binding Items}" />

        <StackPanel Orientation="Vertical" Grid.Column="1">
            <Button Name="BtnRemove" Content="Remove" Click="BtnRemove_OnClick" />
        </StackPanel>
    </Grid>
</Window>

Enable capability to re-apply a transform

In the same way that an observable change set can dynamically change a filter, sort, group, page + more, @ButchersBoy has requested the ability to force dynamic data to re-apply a transform.

Experimenting with the API I have come up with something like this.

var personCache = new SourceCache<Person,string>(p=>p.Name);

//invoke OnNext to force a transform
var forceTransform = new Subject<Func<Person, bool>>();

var transformed = personCache.Connect()
.Transform(p => new PersonProxy(p), forceTransform);

//Use selector to choose which people should be transformed 
forceTransform.OnNext((person,key)=>true);

This is achievable and will be implemented soon.

Consolidate conversion from observable collections to change sets

There is duplicated code for converting to observable change sets from the following sources:

ObservableCollection
ReadOnlyObservableCollection
ReactiveList

This needs to be consolidated into common code.

Additionally there is a bug where the index on reported changes does not always match the true source items index

Wrappers generated by Transform are reused after being disposed

If a given type is being directed to different observable collections based on some filter, and with a transform, and it moves from one collection to another and then back, then the wrapper type is no longer disposed and recreated. On the last move, it's disposed and reused, leading to Bad Things. The test below fails:

using System;
using DynamicData;
using DynamicData.Binding;
using DynamicData.Controllers;
using NUnit.Framework;

namespace DDataTest {
    public class Widget {
        public int Id { get; set; }
        public bool Active { get; set; }
    }

    public class WidgetWrapper : IDisposable {
        public bool IsDisposed { get; private set; }
        public Widget Wrapped { get; private set; }
        public WidgetWrapper(Widget Target) {
            IsDisposed = false;
            Wrapped = Target;
        }

        public void Dispose() {
            IsDisposed = true;
        }
    }

    public class ThrowawayTest {

        [Test]
        public void Does_Not_Reuse_Disposed_Wrappers() {
            var subject = new SourceCache<Widget, int>(w => w.Id);

            var activeObservable = new ObservableCollectionExtended<WidgetWrapper>();
            var inactiveObservable = new ObservableCollectionExtended<WidgetWrapper>();

            var activeFilter = new FilterController<WidgetWrapper>(wrapper => wrapper.Wrapped.Active);
            var inactiveFilter = new FilterController<WidgetWrapper>(wrapper => !wrapper.Wrapped.Active);

            subject.Connect()
                   .Transform(w => new WidgetWrapper(w))
                   .Filter(activeFilter)
                   .Bind(activeObservable)
                   .DisposeMany()
                   .Subscribe();

            subject.Connect()
                   .Transform(w => new WidgetWrapper(w))
                   .Filter(inactiveFilter)
                   .Bind(inactiveObservable)
                   .DisposeMany()
                   .Subscribe();

            var w1 = new Widget() { Id = 0, Active = true };
            var w2 = new Widget() { Id = 1, Active = true };

            subject.AddOrUpdate(w1);
            subject.AddOrUpdate(w2);
            Assert.AreEqual(2, activeObservable.Count);
            Assert.AreEqual(0, inactiveObservable.Count);
            Assert.IsFalse(activeObservable[0].IsDisposed);
            Assert.IsFalse(activeObservable[1].IsDisposed);

            //This needs to be done twice to trigger the behavior
            w1.Active = !w1.Active;
            activeFilter.Reevaluate();
            inactiveFilter.Reevaluate();

            w1.Active = !w1.Active;
            activeFilter.Reevaluate();
            inactiveFilter.Reevaluate();

            Assert.AreEqual(2, activeObservable.Count);

            Assert.False(activeObservable[0].IsDisposed);
            Assert.False(activeObservable[1].IsDisposed);
        }

    }
}

Add OnItemAdded() for IObservableCache<TObject, TKey>

var cache = new SourceCache<Person, int>(x => x.Id);

cache.Connect()
    .OnItemAdded(a => a.MyEvent += OnMyEvent) // does not exist
    .OnItemRemoved(a => a.MyEvent -= OnMyEvent)
    .Subscribe();

A similar approach works with SourceList and IObservableList, but not the cache equivalent.
Have I missed something?

Here is my current approach that seems to work if you are interested in a PR:

    internal static class DynamicDataEx
    {
        public static IObservable<IChangeSet<TObject, TKey>> OnItemAdded<TObject, TKey>(
            this IObservable<IChangeSet<TObject, TKey>> source,
            Action<TObject, TKey> addAction)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (addAction == null)
                throw new ArgumentNullException("addAction");
            return new OnBeingAdded<TObject, TKey>(source, addAction).Run();

        }
    }

    internal sealed class OnBeingAdded<TObject, TKey>
    {
        private readonly Action<TObject, TKey> _callback;
        private readonly IObservable<IChangeSet<TObject, TKey>> _source;

        public OnBeingAdded(IObservable<IChangeSet<TObject, TKey>> source, Action<TObject, TKey> callback)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (callback == null)
                throw new ArgumentNullException("callback");
            _source = source;
            _callback = callback;
        }

        public IObservable<IChangeSet<TObject, TKey>> Run()
        {
            return _source.Do(RegisterForAddition);
        }

        private void RegisterForAddition(IChangeSet<TObject, TKey> changes)
        {
            foreach (Change<TObject, TKey> change in changes)
            {
                switch (change.Reason)
                {
                    case ChangeReason.Add:
                        _callback(change.Current, change.Key);
                        continue;
                        //                        case ChangeReason.AddRange:
                        //                            change.Range.ForEach<TObject, TKey>(this._callback);
                        //                            continue;
                    case ChangeReason.Update:
                        _callback(change.Current, change.Key);
                        continue;
                    default:
                        continue;
                }
            }
        }
    }

Confused ObservableList when merging to ObservableCollectionExtends

Hello Roland,

thank you for providing DynamicData which we use as a standard asset in our projects and which provides very elegant solutions for many bread-and-butter programming issues.

My team and me have discovered some unexpected behavior: When merging two observable collections and then adding and removing values, there is something strange going on. We created a small LinqPad-script that illustrates the issue.

` var a = new ObservableCollectionExtended();
var b = new ObservableCollectionExtended();

Observable.Merge(
a.ToObservableChangeSet(),
b.ToObservableChangeSet())
    //      .AddKey(x => x.GetHashCode())
    .QueryWhenChanged(x => x.Dump())
    .Subscribe();

a.Add(1);
a.Add(2);
b.Add(4);
a.Remove(1);`

The result is as follows

image

When you uncomment the line .AddKey[...], the whole thing is converted to an ObservableCache and the elements are added and removed as expected so we assume it is an issue with ObservableList.

Best,
djb

Can't use DynamicData 5.0.5.2078 in Profile111

Hi,
I have a problem using the current stable in Profile111 (Xamarin Partable Library):

C:\Program Files (x86)\MSBuild\14.0\bin\Microsoft.Common.CurrentVersion.targets(1820,5): warning MSB3274: The primary reference "DynamicData, Version=5.0.5.2078, Culture=neutral, processorArchitecture=MSIL" could not be resolved because it was built against the ".NETPortable,Version=v5.0" framework. This is a higher version than the currently targeted framework ".NETPortable,Version=v4.5,Profile=Profile111".

I installed the nuget package without problems, no dependency errors.
Am I doing anything wrong here? Thank you, this is an amazing library!!!

Optimise batching in change aware list

When chains of operators are applied to the observable list, unless updates are notified as batches there can be a severe performance penalty because list operations have to continually use indexof() operations.

To work around this the change aware list will attempt to batch changes even when the consumer does not do so.

{key} is missing from previous value

I have no idea how to debug exception of type 'DynamicData.MissingKeyException' - "{key} is missing from previous value". It is happening on on .AddOrUpdate action.
Record with this {key} already exist in the SourceCash, so this is on Update.

Suggestion: Way to "Bind" ICollection<T> with SourceCache/SourceList

Hello Roland,

first of all, DynamicData is great!
I currently have the need for maintaining a list (its actually internal settings representation) but also using a SourceCache for propagating changes and data to the rest of the application. Think of it as a settings service in an application.

It would be great to have a builtin way to reflect add and remove changes to a list, like .Bind()

With best regards,
Stefan

Incompatible with Reactive v3.0.0.0

Not really a bug. But unfortunately Reactive v3.0.0 changed their publickey which means dynamicdata cannot be used with reactive v3.0.0 even with an assembly version redirect in place.

From Reactive v3.0.0 release notes...
"The strong name key has been changed. The old key was a Microsoft key that could not be fully checked-in. The new key is checked in, allowing anyone to create custom builds easily. The package names have all changed"

Will need to update the reactive dependant reference to v3 before dynamicdata can be used with it.

Group with IEqualityComparer?

Hey there, I'm looking for an overload of the Group function, that takes an IEqualityComparer, just like the GroupBy function in LINQ and Rx.

Is this something this library should/would/could implement, or is there any other method to achieve this functionality?

Paging Error

This is thrown after I add an item to a bound collection. I can't really seem to narrow down the exact cause, and my stuff still all works.

Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index

at System.ThrowHelper.ThrowArgumentOutOfRangeException()
at System.Collections.ObjectModel.Collection1.RemoveAt(Int32 index) at DynamicData.Binding.SortedObservableCollectionAdaptor2.<>c__DisplayClass5.b__6(Change`2 update) in C:\projects\dynamicdata\DynamicData\Binding\SortedObservableCollectionAdaptor.cs:line 90

at System.ThrowHelper.ThrowArgumentOutOfRangeException()
at System.Collections.ObjectModel.Collection1.RemoveAt(Int32 index) at DynamicData.Binding.SortedObservableCollectionAdaptor2.<>c__DisplayClass5.b__6(Change2 update) in C:\projects\dynamicdata\DynamicData\Binding\SortedObservableCollectionAdaptor.cs:line 90 at DynamicData.Kernel.EnumerableEx.ForEach[T](IEnumerable1 source, Action1 action) in C:\projects\dynamicdata\DynamicData\Kernel\EnumerableEx.cs:line 30 at DynamicData.Binding.SortedObservableCollectionAdaptor2.DoUpdate(ISortedChangeSet2 updates, IObservableCollection1 list) in C:\projects\dynamicdata\DynamicData\Binding\SortedObservableCollectionAdaptor.cs:line 82
at DynamicData.Binding.SortedObservableCollectionAdaptor2.Adapt(ISortedChangeSet2 changes, IObservableCollection1 collection) in C:\projects\dynamicdata\DynamicData\Binding\SortedObservableCollectionAdaptor.cs:line 59 at DynamicData.DynamicDataEx.<>c__DisplayClass3312.b__333(ISortedChangeSet2 updates) in C:\projects\dynamicdata\DynamicData\DynamicDataEx.cs:line 2505 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) at System.Reactive.ExceptionHelpers.Throw(Exception exception) at System.Reactive.Stubs.<.cctor>b__1(Exception ex) at System.Reactive.AnonymousObserver1.OnErrorCore(Exception error)
at System.Reactive.ObserverBase1.OnError(Exception error) at System.Reactive.AutoDetachObserver1.OnErrorCore(Exception exception)
at System.Reactive.ObserverBase1.OnError(Exception error) at DynamicData.DynamicDataEx.<>c__DisplayClass3312.b__333(ISortedChangeSet`2 updates) in C:\projects\dynamicdata\DynamicData\DynamicDataEx.cs:line 2509

Exception handling in subscription code

Hi Roland,

I've been playing with DynamicData for a while now and I've always been confused about the exception handling.

I read the code and its unit tests and I understand that :

  • any exception thrown while editing is caught and given back to an onError action if any, or rethrown. Seems fair to me.
  • any exception thrown while transforming or filtering will be caught and given back to all observers connected to the SourceList/SourceCache via the OnError action (given when subscribing).

But I think there is some kind of an unwanted weird behavior when a subscription code throws.

I tried with the following test :

[Test]
public void SingleSubscriberThatThrows()
{
    var s = new SourceList<int>();
    s.Connect().Subscribe(i => { throw new NotImplementedException(); });

    s.Edit(u => u.Add(1));
}

In this test an exception is correctly thrown when the new value is added in the SourceList, but it's handled and given back to the OnError action. There is here only one subscriber and he's disconnected from the inner subject in the SourceList when its subscription code throws so we're not able to see any exception here. (The subject's OnNext method throws, the only observer is disconnected, the exception is caught and given back to the subject in order to redispatch it to the other subscribers).

Let's try with two more subscribers :

[Test]
public void MultipleSubscribersWithOneThrowing()
{
    var s = new SourceList<int>();
    s.Connect().Subscribe(i => { });
    s.Connect().Subscribe(i => { throw new NotImplementedException(); });
    s.Connect().Subscribe(i => { });

    s.Edit(u => u.Add(1));
}

When you run this you actually have an exception thrown because there is still two other subscribers behind the underlying subject, and the exception is re-thrown because none of these subscribers handle correctly their OnError actions.

Now let's try with proper OnError handling :

[Test]
public void MultipleSubscribersHandlingErrorsWithOneThrowing()
{
    var firstSubscriberExceptionHandleCount = 0;
    var secondSubscriberExceptionHandleCount = 0;
    var s = new SourceList<int>();
    s.Connect().Subscribe(i => { }, e => firstSubscriberExceptionHandleCount++);
    s.Connect().Subscribe(i => { throw new NotImplementedException(); });
    s.Connect().Subscribe(i => { }, e => secondSubscriberExceptionHandleCount++);

    s.Edit(u => u.Add(1));

    Assert.That(firstSubscriberExceptionHandleCount, Is.EqualTo(1));
    Assert.That(secondSubscriberExceptionHandleCount, Is.EqualTo(1));
}

This test is successful ! The two OnError actions are correctly called by the underlying subject. And the exception is thus correctly handled.

The code behind this behavior is the InvokeNext method on SourceList (and same on SourceCache) :

private void InvokeNext(IChangeSet<T> changes)
{
    if (changes.Count == 0) return;

    lock (_locker)
    {
        try
        {
            _changes.OnNext(changes);

            if (_countChanged.IsValueCreated)
                _countChanged.Value.OnNext(_readerWriter.Count);
        }
        catch (Exception ex)
        {
            _changes.OnError(ex);
        }
    }
}

I understand that using the OnError action of a subscriber is pretty useful when handling exceptions in filters or tranformers but catching exceptions the way it's done in InvokeNext method is very dangerous in my opinion.
It's okay when multiple subscribers are connected to the observable because the exception will be given to them or thrown by the underlying subject when the OnError action is not given to all subscribers ... there is a way to be notified of the exception (it's just okay for me because this way feels a bit weird ๐Ÿ˜„ )

But it's very dangerous when only one subscriber is connected : the exception will be swallowed and the subscription will remain broken due to the behavior of the underlying subject. No changes will be propagated after that.

And finally it's mega dangerous when using the DynamicData's RefCount operator because it wraps the underlying observer and give an empty implementation to the OnError action that swallows the exception.

My last test was using the RefCount operator:

[Test]
public void ExceptionShouldBeThrownInSubscriberCode()
{
    var changesCountFromSubscriber1 = 0;
    var changesCountFromSubscriber2 = 0;
    var source = new SourceList<int>();

    var observable = source.Connect().RefCount();
    observable.Subscribe(i => changesCountFromSubscriber1++);
    observable.Subscribe(i => { throw new NotImplementedException(); });
    observable.Subscribe(i => changesCountFromSubscriber2++);

    source.Edit(l => l.Add(1));
    source.Edit(l => l.Add(2));

    Assert.That(changesCountFromSubscriber1, Is.EqualTo(2));
    Assert.That(changesCountFromSubscriber2, Is.EqualTo(2));
}

The result is scary :

  • no exception thrown anywhere
  • changesCountFromSubscriber1 equals 1 instead of 2.
  • changesCountFromSubscriber2 equals 0 instead of 2.
  • all the changes after that will be ignored

The first Add works and then the exception is thrown, the second subscriber is not called, the subject is completed and the following calls are just dropped. You have to unsubscribe and subscribe again to make it work.

Sorry for the super long issue but I think it can be a tricky bug to find for the users of this library (I am one, and I had this bug once ๐Ÿ˜„ )
I really don't have any good idea to solve this at the moment but I'm sure we can find something great ๐Ÿ‘

Any thoughts ?

Reverse operator

Create an operator which reverses the result set order.

I often I find I order data on most recent first. This operator will achieve that much more efficiently than using the Sort() operator.

TransformMany operator problem

The transform many operator does not correctly handle the situation when the children are removed from the parent update

Can't combine SourceLists logically because of clear()

The below test fails

    [Fact]
    public void MergeManyShouldWork()
    {
        var a = new SourceList<int>();
        var b = new SourceList<int>();
        var c = new SourceList<int>();

        var parent = new SourceList<SourceList<int>>();
        parent.Add(a);
        parent.Add(b);
        parent.Add(c);

        var d = parent.Connect().MergeMany(e => e.Connect()).AsObservableList();

        d.Count.Should().Be(0);

        a.Add(1);

        d.Count.Should().Be(1);
        a.Add(2);
        d.Count.Should().Be(2);

        b.Add(3);
        d.Count.Should().Be(3);
        b.Add(5);
        d.Count.Should().Be(4);
        d.Items.Should().BeEquivalentTo(new[] {1, 2, 3, 5});


        b.Clear();

        // Fails below
        d.Count.Should().Be(2);
        d.Items.Should().BeEquivalentTo(new[] {1, 2});
    }

The Clear() methods breaks the changeset because it cannot be combined with other change sets and totally blows away result.

RX based sort throws error.

SortController is now marked obsolete. When I try and use the preferred RX based overloads of .Sort, the method throws an ArgumentNullException.

You can reproduce this easily by by creating a new Console App, and with the following Program.cs contents:

using System;
using System.Collections.ObjectModel;
using System.Reactive;
using System.Reactive.Subjects;
using DynamicData;
using DynamicData.Binding;

namespace DynamicDataSortWithRx
{
    public class Data
    {
        public Data(int id, string value)
        {
            Id = id;
            Value = value;
        }

        public int Id { get; }
        public string Value { get; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var cache = new SourceCache<Data, int>(d => d.Id);

            var sortPump = new Subject<Unit>();

            ReadOnlyObservableCollection<Data> datas;
            var disposable = cache.Connect()
                .Sort(SortExpressionComparer<Data>.Ascending(d => d.Id), sortPump)
                .Bind(out datas)
                .Subscribe();

            disposable.Dispose();
        }
    }
}

Stack trace:

   at System.Reactive.Linq.Observable.Synchronize[TSource](IObservable`1 source, Object gate)
   at DynamicData.Cache.Internal.Sort`2.<Run>b__7_0(IObserver`1 observer)
   at System.Reactive.AnonymousObservable`1.SubscribeCore(IObserver`1 observer)
   at System.Reactive.ObservableBase`1.Subscribe(IObserver`1 observer)
--- End of stack trace from previous location where exception was thrown ---
   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception)
   at System.Reactive.Stubs.<>c.<.cctor>b__2_1(Exception ex)
   at System.Reactive.AnonymousObserver`1.OnErrorCore(Exception error)
   at System.Reactive.ObserverBase`1.OnError(Exception error)
   at System.Reactive.SafeObserver`1.OnError(Exception error)
   at System.Reactive.Linq.ObservableImpl.Do`1._.OnError(Exception error)
   at System.Reactive.AutoDetachObserver`1.OnErrorCore(Exception exception)
   at System.Reactive.ObserverBase`1.Fail(Exception error)
   at System.Reactive.ObservableBase`1.Subscribe(IObserver`1 observer)
   at System.ObservableExtensions.SubscribeSafe[T](IObservable`1 source, IObserver`1 observer)
   at System.Reactive.Linq.ObservableImpl.Do`1.Run(IObserver`1 observer, IDisposable cancel, Action`1 setSink)
   at System.Reactive.Producer`1.Run(IScheduler _, State x)
   at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run(SchedulerQueue`1 queue)
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Producer`1.SubscribeRaw(IObserver`1 observer, Boolean enableSafeguard)
   at System.Reactive.Producer`1.Subscribe(IObserver`1 observer)
   at System.ObservableExtensions.Subscribe[T](IObservable`1 source)
   at DynamicDataSortWithRx.Program.Main(String[] args) in H:\GitHub\DynamicDataSortWithRx\DynamicDataSortWithRx\Program.cs:line 35
   at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
   at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
   at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
   at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.ThreadHelper.ThreadStart()

Throw more meaningful exceptions

Index out of range exceptions can be thrown for various reasons when sorting, virtualising or paging. It would be more meaningful to pre-check for index out of range and throw a wrapped exception.

Create dynamic Or combiner

I've had a hack at it. It seems to work but looks very inefficient.

        public static IObservable<IChangeSet<T>> Or<T>
        (IObservable<IChangeSet<SourceList<T>>> observable)
        {
            var sl = new SourceList<T>();
            var foo = observable
                .ToCollection()
                .Select
                (list => list.Count != 0 ? list.Select(l => l.Connect())
                             .Aggregate((a, b) => a.Or(b))
                             .ToCollection()
                             : Observable.Return(ImmutableList<T>.Empty)

                             )
                .Switch();

            var d = foo
                .Subscribe
                (b =>
                 {
                     sl.Edit
                         (e =>
                          {
                              e.Clear();
                              e.AddRange(b);
                          });

                 });

            return Observable.Using(()=>d,o=> sl.Connect());
        }

Can I wrap DB table as an observable

Hi, is it possible to get a reader observable wrapper on a Database table, for e.g. I have a table of tickets, how do I watch the table with this lib - this would be a tremendous help.

Create overloads to allow sorting when properties change

The observable cache has .Sort() overloads to allow the consumer to change the comparer or to force a resort in the event that a property which is sorted on changes. These options are not available on the observable list and they need to be implemented

For ChangeReason.Change, must supply previous value

When transforming changes into an Observable of a single value, it is possible that the transformed value is null (e.g. when a property is not set on an object), which is perfectly normal.

However, if such a transformation leads to a result value being changed from null to something else, there is an exception thrown in the constructor of Change with the following message: "For ChangeReason.Change, must supply previous value". (See sample code here for reproducing.)

Create OnItemAdded callback

There is OnItemRemoved but no OnItemAdded. I find that I sometime have to do some work when something is added to the collection. My implementation is.

    internal sealed class OnBeingAdded<T>
    {
        private readonly List<T> _items = new List<T>();
        private readonly IObservable<IChangeSet<T>> _source;
        private readonly Action<T> _callback;

        public OnBeingAdded(IObservable<IChangeSet<T>> source, Action<T> callback)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (callback == null)
                throw new ArgumentNullException("callback");
            _source = source;
            _callback = callback;
        }

        public IObservable<IChangeSet<T>> Run()
        {
            return Observable.Create((Func<IObserver<IChangeSet<T>>, IDisposable>)(observer =>
           {
               var subscriber = _source.Do(RegisterForRemoval, observer.OnError).SubscribeSafe(observer);
               return Disposable.Create(() =>
                                        {
                                            subscriber.Dispose();
                                            _items.ForEach<T>(t => _callback(t));
                                            _items.Clear();
                                        });
           }));
        }

        private void RegisterForRemoval(IChangeSet<T> changes)
        {
            changes.ForEach(change =>
          {
              switch (change.Reason)
              {
                  case ListChangeReason.Add:
                      _callback(change.Item.Current);
                      break;
                  case ListChangeReason.AddRange:
                      change.Range.ForEach(_callback);
                      break;
                  case ListChangeReason.Replace:
                      _callback(change.Item.Current);
                      break;
                  case ListChangeReason.Remove:
                      break;
                  case ListChangeReason.RemoveRange:
                      break;
                  case ListChangeReason.Moved:
                      break;
                  case ListChangeReason.Clear:
                      break;
                  default:
                      throw new ArgumentOutOfRangeException();
              }
          });
            _items.Clone(changes);
        }
    }

    public static IObservable<IChangeSet<T>> OnItemAdded<T>
        (this IObservable<IChangeSet<T>> source,
         Action<T> addAction)
    {
        if (source == null)
            throw new ArgumentNullException("source");
        if (addAction == null)
            throw new ArgumentNullException("addAction");
        return new OnBeingAdded<T>(source, addAction).Run();
    }

Some operators share state

Some of the operators share state when subscribed to multiple times. This is clearly incorrect behaviour

Sum aggregates nullable to non-nullable

Shouldn't null + null be null and not 0?

public static IObservable<Decimal> Sum<T>(this IObservable<IChangeSet<T>> source, Func<T, Decimal> valueSelector) {...}

public static IObservable<Decimal> Sum<T>(this IObservable<IChangeSet<T>> source, Func<T, Decimal?> valueSelector) {...}

The overload 2 can be covered by valueSelector when using overload 1. The null sum scenario however is impossible at the moment.

Error with .Convert().Sort()

Hello

I have an issue with Convert followed by Sort. As soon as I remove an element of the underlying source, I get an exception:

Current position cannot be found. The item is not in the collection

The query is

cache.Connect()
    .Convert(nr => new Container(nr)) 
    .Sort(SortExpressionComparer<Container>.Ascending(c => c.Nr))
    .Subscribe(x => x.Dump(), ex => ex.Dump());

Here you can find a LinqPad script as .txt file.
Convert - Sort - Problem.txt

I do not see that exception when I use Transform or TransformSafe instead of Convert.
Is that by design?
Is Convert the wrong method to use here?

Thanks a lot for any hint or explanation.

Greetings
Peter

Shouldn't Count() provide '0' for an empty source cache?

I came across the need of an observable for the number of elements in an source cache.

var sourceCache = new SourceCache<string, int>(s => s.GetHashCode());

sourceCache.Connect()
            .Count()
            .Dump();

sourceCache.AddOrUpdate("test");

Result is:

IObservable<Int32>
1

From my point of view, the expected behavior should be:

IObservable<Int32>
0
1

Introduce Join Operators for ObservableCache

There are currently operators to apply logical collection operators, However there is no direct means of joining 2 different caches together.

Ideally there should be a JoinOne (for one to one mappings) and JoinMany (for one to many operators).

For example

var people = new SourceCache<Person, string>(person => person.Name);
var address = new SourceCache<Address, string>(address => address .PersonName);

For a one to one mapping

var personWithOneAddress = people.JoinOne(address, address=> address.PersonName, (person, address)=> new PersonWithAddress(person,address ));

and also something similar where a person has many addresses

[Question] Connect to a bound list to create a "view on a view on a source" / "derived-derived collection"?

My scenario:

  1. there is a source list of domain entities;
  2. that source list is used to produce a derived list of view models which have a property IsSelected since selection is a GUI concern;
  3. how do I react to IsSelected property changes to create a "sub-derived" collection of selected view models?

Here is a working mixed ReactiveUI solution, which unfortunately has it's own known problems.

private readonly ReactiveList<OrderViewModel> _items;
private readonly IReactiveDerivedList<OrderViewModel> _selectedItems;

orderProvider.Orders
    .Connect()
    .Transform(o => new OrderViewModel(o))
    .ObserveOnDispatcher()
    .Bind(_items)
    .DisposeMany()
    .Subscribe();

_selectedItems = _items.CreateDerivedCollection(x => x, x => x.IsSelected);

Is there any way to make it DynamicData-only maybe via WhenValueChanged? So far I have discovered Clone which doesn't seem to be the best fit.

help running out of memory, need a better suggestion/sample on observing

Hi, can you please help, I am using the library to watch a list of books from a table.

//Books is the catalog list, title, username, checkin Date, Check Out date
var llibraryCatalog = new SourceList<Books>();

On the server-side I am using the reactive extensions to observe the llibraryCatalog. So, when the books are checked out, or check in, over due, I need to send notifications to the user. So, I created the observable and when that list changes, I write to the list.

Each user has an inbox when they login, and I need to subscribe to changes on that filtered list.
// Users Inbox - notifications IObservable<IChangeSet<Books>> myIntsObservable = llibraryCatalog.Connect();

The list is at 12000 and I've 2 issues, and seeking help

  1. I get an out of memory exception, or it get slow can you please help redo this better. ๐Ÿ”ข
  2. How to implement the filter for the users, or Design should I simply have one master filtered list of all late books, all new books etc, and then have the users object subscribe to it?

thanks

GroupOn does not bind to target collection when only items within a group have changed.

When I have a source collection that I project into a target collection with grouping, the target collection is correctly updated if a new item is added that causes the creation of a new group.
If I add an item to the source list that does not cause the creation of a new group, then the target collection won't reflect the newly added item.

The following unit test shows this issue clearly:

// A sample source list.
var sourceList = new ObservableCollection<string>(new[]
{
    "Ape",
    "Ant",
    "Bear",
    "Boar",
    "Cougar",
});

// The target list to update when the source list changes.
var targetList = new ObservableCollectionExtended<IGroup<string, char>>();
bool targetListIsUpdated;

sourceList
    .ToObservableChangeSet()
    .Sort(SortExpressionComparer<string>.Descending(x => x))
    .GroupOn(x => x[0])
    .Bind(targetList)
    .Subscribe(x =>
    {
        targetListIsUpdated = true;
    });

// This will pass the assert, because a new group will be created (there's no group by 'D' yet).
targetListIsUpdated = false;
sourceList.Add("Dolphin");
Assert.IsTrue(targetListIsUpdated);

// This will fail the assert, because no new group is created (there's already a group by 'A').
targetListIsUpdated = false;
sourceList.Add("Algae");
Assert.IsTrue(targetListIsUpdated);

MissingKeyException on Filter and Group

Hi there,

first of all, thanks for your great work. Looks like an amazing useful utility and I would love to integrate it in some parts of my application. Unfortunately, on my first test I run into an issue with grouping and filtering.
So here is what I try to do:

  1. A SourceCache is filled from a Service. Items are only added so far.
  2. After the Connect I apply a Filter and then I Group the data.
  3. The Filter may change and additional data may be added.
    But now I run into MissingKeyException("{0} is missing from previous value on remove. Object type {1}, Key type {2}, Group key type {3}" in my big Application.
    I created a little Test Project and there I have the same issue. At least it looks like, because in my big application it also crashed when the SourceCache is not changed. Just starting Edit and doing a loop inside makes the difference. If I do not call Edit after changing the filter it does work. Also it does "work" if I throttle 250ms before and after the Filter part...
    However, here is the sample. Hope you could help me with that and tell my what I'm doing wrong here... Thanks!
public sealed partial class MainPage : Page
   {
      private SourceCache<TestObject, int> m_sourceCache;
      private ReplaySubject<Func<TestObject, bool>> m_changeObs;
      private ReadOnlyObservableCollection<GroupContainer> m_entries;
      public ReadOnlyObservableCollection<GroupContainer> Entries => m_entries;

      public MainPage()
      {
         this.InitializeComponent();

         m_changeObs = new ReplaySubject<Func<TestObject, bool>>();

         var dispatcher = new CoreDispatcherScheduler(Window.Current.Dispatcher);

         m_sourceCache = new SourceCache<TestObject, int>(x => x.Id);
         m_sourceCache.Connect()
            .Filter(m_changeObs)
            .Group(x => x.Group)
            .Transform(x => new GroupContainer(x, dispatcher))
            .ObserveOn(dispatcher)
            .Bind(out m_entries)
            .Subscribe();

         Observable.Interval(TimeSpan.FromMilliseconds(250))
            .Subscribe(x => DoFilterAndAdd());
      }

      private int m_count;
      private void AddItems()
      {
         m_sourceCache.Edit(innerCache =>
         {
            var list = new List<TestObject>();
            for (var i = 0; i < 10; i++)
            {
               m_count++;
               list.Add(new TestObject() {Id = m_count, Group = i, Text = DateTime.Now.ToString("T")});
            }
            innerCache.AddOrUpdate(list);
         });
      }

      private bool m_bool;
      private void DoFilterAndAdd()
      {
         m_bool = !m_bool;
         m_changeObs.OnNext(o => m_bool);
         AddItems();
      }
   }

   public class TestObject
   {
      public int Id { get; set; }

      public string Text { get; set; }

      public int Group { get; set; }
   }

   public class GroupContainer
   {
      private readonly ReadOnlyObservableCollection<GroupEntry> m_entries;

      public ReadOnlyObservableCollection<GroupEntry> Entries => m_entries;

      public GroupContainer(IGroup<TestObject, int, int> groupEntries, IScheduler scheduler)
      {
         groupEntries.Cache.Connect()
                                 .Transform(x => (new GroupEntry(x)))
                                 .ObserveOn(scheduler)
                                 .Bind(out m_entries)
                                 .Subscribe();
         Title = groupEntries.Key.ToString("D");
      }
      public string Title { get; set; }

   }

   public class GroupEntry
   {
      public string Title { get; set; }

      public GroupEntry(TestObject obj)
      {
         Title = obj.Text;
      }
   }

Operators to convert a standard observable into an observable change set

Require operators to take a standard rx observable and convert is into an observable change set

IObservable<IChangeSet<TObject>> ToObservableChangeSet<TObject>(this IObservable<T> source)

also need a buffered converter

IObservable<IChangeSet<TObject>> ToObservableChangeSet<TObject>(this IObservable<IEnumerabe<T>> source)

Additionally add overloads to limit by size and by time

Code formatting/style

I've got a couple of suggestions, let me know if you agree and I'll create a pull request.

  1. Only use tabs - no spaces for indentation
  2. Don't use IEnumerable.ForEach, use foreach instead
  3. Put switch break; statements inside the scope regions if present

Cheers

getting a random item from a paged, filtered list

I have an ItemCache connected like so, where Items is an OC<ItemVm>

ItemCache.Connect()
                        .Filter(FilterController).Do(OnNext)
                        .Sort(SortController).Do(OnNext)
                        .Page(PageController).Do(OnNext)
                        .ObserveOnDispatcher()
                        .Do(changes => _pageParameters.Update(changes.Response))
                        .Bind(Items)
                        .Subscribe();

Here is my attempt at getting a random item. It is using Items

            var rInt = r.Next(0, Items.Count - 1);
            var randomItemVm = Items.Skip(rInt).Take(1).Single();

The problem is that Items only ever holds as many items as there are per page. How can I pull a random item from ItemCache whether or not it is on the current page, keeping in mind that a filter has been applied?

Combine list operators

Write operators to combine lists using the following logic operators:

Or
And
Xor
Except

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.