Is the Reader distributed? If yes how does that work?

I want to combine Gears, Hashes and Sets. Hashes will store profiles. Sets will be my indexes (storing subsets of Hash keys). Gears will mesh them together with a goal of getting a N-dimensional contingency tables. As an example: “given field value_1234 has possible values {1,2,3} and value_9876 has possible values {1,2} give me the six counts (count of rows where value_1234=1 && value_9876=1, count of rows where value_1234=1 && value_9876=2, etc…) for all profiles that have both fields”.

I get the theory of map/reduce and have used Hadoop and other similar frameworks. In something like “original recipe Map/Reduce Hadoop” the input would be loaded in map tasks each getting something like a file reference or a file reference with an offset+length. How does the Redis Reader distribute the work of generating input data?

To shift from a general question to something highly specific I did a proof of concept. The following code a.) computed the correct results b.) was faster with the “Set indexes” (e.g. faster when I used PythonReader than when I used a KeyReader) c.) the speedup of “b” was proportional to size of the “Set indexes” (e.g. the smaller number of items in the Set index the faster it ran, I think linearly). My concern is when I go from my POC (~100k rows) to my full dataset (~100M rows) I’ll hit a wall. For example it seems to me like my Python would be 1.) running on one shard (I’ll call it “A”) requiring moving remote Set values (which might have up to 100M values) over the network to the “A” shard (for example if set value_1234 is on “A” and value_9876 is on “B”) and 2.) all of the data I’m yielding starts on “A” so to distribute work to the other N-1 shards would mean moving the values over the network… no?

def SetAsIndex():
    #find all rows that match an ad hoc boolean query of fields, this is value_1234 ^ value_9876
    res = execute('SINTER', 'value_1234', 'value_9876')
    for x in res:
        res2 = execute('HMGET', 'profile_'+x, 'value_1234', 'value_9876')
        #python isn't my daily language so I'm sure there is a better way of writing this...
        retDict = {'key':'profile_'+x}
        valueDict = {}
        valueDict['value_1234'] = res2[0]
        valueDict['value_9876'] = res2[1]
        retDict["value"] = valueDict
        yield retDict

bg = GearsBuilder('PythonReader')
#uncomment for the full scan version
#bg = GearsBuilder();
#bg.filter(lambda x: 'value_1234' in x['value'] and 'value_9876' in x['value'])
bg.groupby(lambda r : r['value']['value_1234'] + ',' + r['value']['value_9876'], lambda key,a,r: 1 + (a if a else 0))
#bg.run('profile_*')
bg.run(SetAsIndex)


I hope I understood what you try to achieve correctly.

First for you question the reader run on all the shards.
Also it make sense that your python reader is faster because the keys reader scans all the keys and your python reader only search on the sorted sets.

Now to the other questions, I see from your example that you try to achieve a countby, we have a build in function for this that does it more efficiently (local count on each shard and then only sum on the results) https://oss.redislabs.com/redisgears/0.9/operations.html#countby. Seems like what you need here is to have a sorted sets per shard that will index the hashes and that shard, gears gives you a nice way to make sure you create a key that located on the shard you are running on by using hashtag() function (https://oss.redislabs.com/redisgears/0.9/runtime.html#hashtag), you can put the return value from the hashtag function inside ‘{}’ and then you know that the sorted sets is located correctly.

Your reader should read from the local sorted set (also it needs to use the hashtag function to locate the correct sorted sets) and then you can use the countby operation which will perform a local counting on each shard and then global sum on each group. If the number of group is respectively smaller then the actual number of records then only a small portion of the data will move between shards and most of the calculation will be local to each shard.

Hope its clear, let me know if you need any more assist with that.

Thanks for the response!

What I’m hearing:

1.) Instead of a single Set value_1234 I really have N Sets, one per shard. The location of the VALUES in these Sets use the same hash as the KEYS of my Hash. E.g. value_1234_shard_1 might contain values profile_1, profile_3454, and profile_4432 if those strings hash to shard 1.

2.) In the Reader when I want to SINTER the logical Sets value_1234 and value_9876 I’m really using the N sharded Sets local to each shard.

3.) I then can use HMGET on these values (which is by definition local keys for the Hash)

4.) I then can use local aggregations

  1. collect() and aggregate globally (this is the only “over the network” step)

?

How do I name the Sets to ensure they map to the correct shard? E.g. a Set keyed by value_1234_shard_1 might actually hash to shard 3.

Thanks again,

will

Yes this is exactly what I meant, to make sure sets are mapped to the correct shard you need to use the ‘hashtag’ function supplied by gears (https://oss.redislabs.com/redisgears/runtime.html#hashtag). The ‘hashtag’ function return a string that if calculate hashslot on this string it will mapped to the shard on which the hashtag function was called on. So instead of ‘value_1234_shard_1’ use ‘value_1234{%s}’ % hashtag() and you will know for sure that you read the set of the right shard.

Ok, and now this makes sense too!

Just to confirm my existing code to add “value_4567=6” for profile “profile_12345678” was:

HSET profile_12345678 value_4567 6

SADD value_4567 profile_12345678

But instead I need to do:

HSET profile_12345678 value_4567 6

SADD value_4567{profile_12345678} profile_12345678

Right?

Thanks again!

will

Actually this way you will have a sorted set per hash, I thought of having a sorted set per field per shard but then it makes it hard to add data because you do not know to which shard’s SETS you need to add the data to, luckily you can use gears registration for this task, it will look something like this:

def IndexHash(x):

print(x)

key = x[‘key’]

val = x[‘value’]

for k in val.keys():

   execute('sadd', '%s{%s}' % (k, hashtag()), key)

GB().foreach(IndexHash).register(mode=‘sync’, eventTypes=[‘hset’, ‘hmset’])

``

the ‘mode=‘sync’’ means that the trigger execution will be local and run on the same thread as the key space notification (its not worth moving this indexing task to a background thread because its a very small and fast task).

After you register this, each time you will add a hash it will automatically be indexed locally

127.0.0.1:6379> hset k foo bar bar foo

(integer) 2
127.0.0.1:6379> keys *

  1. “foo{06S}”

  2. “bar{06S}”

  3. “k”

``

In this example the hash key is added to ‘foo{06S}’ and ‘bar{06S}’ sets because it contains field ‘foo’ and field ‘bar’ and we know that those sets located correctly on the shard thanks to the ‘{}’

Then you can run your other execution to search, moreover gears comes with a command reader that allows to trigger execution on commands (notice that this feature is only available on master and not yet documented, it will be soon). So basically if you run this:

def SetAsIndex®:

keys = [’%s{%s}’ % (a, hashtag()) for a in r]

res = execute(‘SINTER’, *keys)

final_res = []

for x in res:

res2 = execute(‘HMGET’, x, *r)

retDict = {‘key’:x}

valueDict = {}

for i in range(len®):

valueDict[r[i]] = res2[i]

retDict[“value”] = valueDict

final_res.append(retDict)

return final_res

bg = GearsBuilder(‘CommandReader’)

bg.map(lambda x: x[1:]) # the command reader first argument is the command and we do not need it

bg.flatmap(SetAsIndex)

bg.countby(lambda r : ‘,’.join(r[‘value’].keys()))

bg.register(trigger=‘intersect_search’)

``

You will be able to call ‘RG.TRIGGER intersect_search foo bar’ and get the number of hashes that contains ‘foo’ and ‘bar’ as fields:
127.0.0.1:6379> RG.TRIGGER intersect_search foo bar

  1. “{‘key’: ‘foo,bar’, ‘value’: 1}”

``

Notice that we basically building an inverted index here using sets, the real issues will come when you want to delete/update an existing hash, how do you update the index? It is possible by using the command reader to trigger an execution that will remove the key from the relevant SETS and then update the hash (which will trigger the indexing).

Hope its clear, let me know if you need any help with the update.

This is even better than I thought!

I read a little about event listeners but I hadn’t put 2+2 together to realize that I could handle managing the inverted index (which is the goal, yes) without extra client-side code. The “even better” is the CommandReader/RG.TRIGGER. My real use case involves ad hoc queries to a custom database (*). I originally thought I was going to have to write client code to dynamically generate Python to push into Redis, but now I see a path to just send the query to Redis and parse/convert it “on the inside" making this a drop in replacement possibility.

(*) An example “real world” query could look something like this:

count value_1 in [1,2], value_2 in [1,8], value_3 in [1,3], value_4 in [1,7] from current where ((value_5=1) || (value_6=1) || (value_7=1) || (value_8=1) || (value_9=1) || (value_10=1) || (value_11=1) || (value_12=1) || (value_13=1) || (value_14=1) || (value_15=1) || (value_16=1) || (value_17=1) || (value_18=1) || (value_19=1) || (value_20=1))

This would return a 4D structure with 336 cells. In this case value_1 and value_2 have ~100M members but value_3 and value_4 are more modest (~100k) with maybe ~10k that overlap all 4. The condition involves a lot of field of varying size (from 10’s to 10M’s) but should be cacheable for reasonable periods of time (so instead of doing a SUNION I’d do a SUNIONSTORE with a findable name and set an expiration) and filters the ~10k down to ~5k. The severe drop off from 100M to 5k is the need for the inverted index to be utilized early in the process (and I need the Set operations “O(n) for the smallest set” characteristic).

Thanks again. This is cool stuff,

will

I am not sure I entirely follow your example, I guess the first part (value_1 in [1,2]) is the index search, then the where is kind of filtering, not sure I understand whats the current stands for.

Regarding the numbers I guess it will require some tuning to find the correct amount of shards that gives a good balance between local shard query processing and minimize network usage. The caching will for sure help the performance (I guess the best thing is that each shard will have its own caching) also I think its possible to clean the cache on data changes and then your cache will be up to date (but its really depends on the use case which I am not entirely understand).

I will be happy to continue the discussion and help you further, can you confirm/deny the assumptions I wrote above (about what I understand regarding the example query)?