Correct and fastest way to send Gears script to background using gears-cli

Hello folks,
I really like RedisGears for Data science flow and I really appreciated gears-cli, but I would like to send a large computation job to background:

import spacy 
nlp=spacy.load('en_core_web_md', disable=['ner','tagger'])

def remove_prefix(text, prefix):
    return text[text.startswith(prefix) and len(prefix):]

def parse_paragraphs(x):
    key_prefix="paragraphs:"
    #detect language of the article
    lang=execute('GET', 'lang_article:' + x['key'])
    if lang=='en':
        paragraphs =x['value']
        doc=nlp(paragraphs)
        idx=1
        article_id=remove_prefix(x['key'],key_prefix)
        for each_sent in doc.sents:
            sentence_key=f"tokens:{article_id}:{idx}"
            execute('SET', sentence_key, each_sent)
            idx+=1
        execute('SADD','processed_docs_stage2_sentence', article_id)
    else:
        execute('SADD','screw_ups', x['key'])
    

gb = GB()
gb.foreach(parse_paragraphs)
gb.run('paragraphs:*')

(code is on github on https://raw.githubusercontent.com/AlexMikhalev/cord19redisknowledgegraph/master/spacy_sentences_geared.py)

Script has a dependency but also takes a chunk of memory and processing time. At the moment it works as blocking gears-cli 10.144.133.112 --port 6379 spacy_sentences_geared.py --requirements requirements_gears_spacy.txt but takes more time than I have the patience or broadband connection.
Any hints?

Hey @AlexMikhalev

Notice that you basically return all the processed data to the client, not sure this was what you intended to do (because I also see you populate a set). I bet this is what consuming the memory. If this is not what you intended then just put a ‘count’ operation before the ‘run’ to return just the amount of records. I think this will reduce the memory overhead you mentioned.

Regarding processing time, The best way to reduce processing time (especially with such a task that can be totally parallelized) is by adding more shards that will help to process it in parallel. how much time are we talking about and how much data?

Hello @meirsh thank you for reply.
I am adding count (testing now). The amount of text is about 11 GB and time is quite long - not sure it even finished, but I am keen to explore sharding - I have 7 node cluster.
Where would you suggest I can find good example? I don’t want to register it for events.
Also one worry to submit such job to whole cluster is that spacy( or bert) models are quite large 400+ MB binary.

For 11 GB you do not need 7 nodes you just need more shards to parallelize the processing (can be done on the same machine if it has enough cores). I see what you mean regarding the model size, it will definitely increase the memory usage but I guess this is the tradeoff. What you can do is to load this model only on several shards and not on all the cluster and reshafel the data to those shards to run the model on them (But this will come with a network overhead so again its a tradeoff)

By the way, why you do not want to register a gear that will maintain those keys (I see you have set and also strings) I think that a good pattern here would be to maintain those keys and sets locally on each shard (with a gears registration) and then when you want to collect the data you can run another gear function that collects it and summerise it.

Regarding references take a look at the example page on the documentation: https://oss.redislabs.com/redisgears/examples.html

Hello meirsh, I am deep through examples, if more there is more specific example to illustrate your points it would be helpful: i.e. do I need to reshafel data to the shards using redis-cli --cluster reshard command?
Also in terms of memory management, will use of gc.collect help?
My understanding for pinning process to redis gears shard, do I have to use hashtag()? Is any good example for distribute calculation besides PI MonteCarlo?

@AlexMikhalev Did the count help with the memory usage? How much did the memory usage increases?

when I said reshafeling I meant repartion operation (https://oss.redislabs.com/redisgears/operations.html#repartition). The idea is to decide on a subset of shards that will hold the model and round robin (for example) the records to those shards. To get a list of keys such that each key map to one shard do the following:

127.0.0.1:20004> RG.PYEXECUTE “GB(‘ShardsIDReader’).map(lambda x: hashtag()).run()”

    1. “6YJ”
    2. “450”
    3. “06S”
  1. (empty array)

Here for example on my cluster, I have 3 shards, so I got 3 keys each map to one of the shards. Now I can do the following to reprtition the keys in a round-robin fashion to a subset of the shards:

shardsSubset = [‘6YJ’, ‘450’]
index=0

repatitionCallback = None
myHashTag = None

def KeepLocal():
----global myHashTag
----return hashtag()

def ChooseShard():
----global index
----global shardsSubset
----index = (index + 1) % len(shardsSubset)
----return shardsSubset[index]

def RepartitionCallback( r ):
----global KeepLocal
----global ChooseShard
----global repatitionCallback
----global myHashTag
----if repatitionCallback is None:
--------myHashTag = hashtag()
--------if myHashTag in shardsSubset:
------------repatitionCallback = KeepLocal # curr shard is in shards subset, lets keep all record local
--------else:
------------repatitionCallback = ChooseShard # we need to pass the record to one of the shards subset
----return repatitionCallback()

GB().
repartition(RepartitionCallback).
foreach(lambda x: print(‘%s’ % hashtag())).
count().
run()

The repartition operation will move the data to the subset of shards. You can increase/decrease this shards subset and see how it affects processing time.

Please notice that I am not sure this will increase any performance, maybe the network overhead will be too big (it also depends on the size of your data…) and you will have to leave with the memory overhead of loading the model on each shard. As I said before its a memory/processing-time tradeoff.

In addition, you need to be careful not to create keys that are not correctly mapped to the shard. Use the hashtag function inside {} or just {} with the original key. Look how we did it on the rgsync recipe https://github.com/RedisGears/rgsync/blob/81af54750546c62ee9f7352875b88b6dd2864fbe/rgsync/common.py#L25

You can also look here for more examples about how some build-in functionalities gears provide are implemented:
https://github.com/RedisGears/RedisGears/blob/master/src/GearsBuilder.py

Hope the answer is clear, let me know about the progress. I believe that if processing time will still not be good enough we can come up with more ideas to optimize.

@meirsh count helped greatly, now back to drawing boards to make NLP pipeline event driven rather than batch.
Thank you, I will come back with more question, but your replies helped greatly.

1 Like

@meirsh turning spacy nlp into event driven job proved to be RedisGear bomb:

from redisgears import log
import spacy 
nlp=spacy.load('en_core_web_md', disable=['ner','tagger'])
nlp.max_length=2000000

def remove_prefix(text, prefix):
    return text[text.startswith(prefix) and len(prefix):]

def parse_paragraphs(x):
    key_prefix="en:paragraphs:"
    #make sure we only process english article
    paragraphs =x['value']
    doc=nlp(paragraphs)
    idx=1
    article_id=remove_prefix(x['key'],key_prefix)
    for each_sent in doc.sents:
        sentence_key=f"sentences:{article_id}:{idx}"
        execute('SET', sentence_key, each_sent)
        idx+=1
        execute('SADD','processed_docs_stage2_sentence', article_id)
        log("Successfully processed paragraphs "+str(article_id),level='notice')
    else:
        execute('SADD','screw_ups', x['key'])
    

gb = GB()
gb.foreach(parse_paragraphs)
gb.count()
gb.register('en:paragraphs:*',keyTypes=['string'])

Only changed run to register, once register and trying to process watch redis gears cluster die with

On ExecutionPlan_NotifyRun, execution aborted 

I thought it’s memory issue, so I grabbed 256 GB RAM server and installed fresh cluster with
docker run -d -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster
As soon as I am trying to process data (100 records) cluster nodes die, two nodes consumed over 42.31 GB (and this is on 100 records).
Seems spacy is killer for event-driven gears. I didn’t try sharding, I doubt it will help, but feels I will have to resort to a more conventional approach.

@AlexMikhalev lets not abandon this approach just yet because I believe its the right one … Can you send me an exact reproduction steps so I will try to identify the issue. When you say 100 records you mean 100 keys? I do see couple of things here that might go wrong, for example you creates the sets and the keys without {}. You do not define ExecutionMode (so its global and not efficient).
Please if you can send me the reproduction with some data you tried with and I will try to identify the issue exactly.

Also if you can share the log files of the shards so I can look at them it will be great …

@meirsh
Steps to reproduce:

mkdir ./input
pip install kaggle 
cd input
kaggle datasets download allen-institute-for-ai/CORD-19-research-challenge
unzip CORD-19-research-challenge.zip
docker run -d --name rgcluster -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster:latest
git clone https://github.com/AlexMikhalev/cord19redisknowledgegraph 
cd cord19redisknowledgegraph
pip install gears-cli
sh cluster_pipeline_events.sh

For grabbing Kaggle dataset you need to get API keys and store in ~/.kaggle/kaggle.json
cluster_pipeline_events.sh using RedisIntakeRedisClusterSample.py which is capped at 200 files.

Adding re-partition didn’t help either:

gb.repartition(lambda x: int(len(x['value'])))

logs


==> 30003.log <==
13:M 20 May 2020 13:54:11.639 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:11.639 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:11.639 # <module> On ExecutionPlan_NotifyRun, execution aborted

==> 30001.log <==
9:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyReceived, execution aborted

==> 30002.log <==
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted

==> 30003.log <==
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted

==> 30001.log <==
9:M 20 May 2020 13:54:32.347 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:37.826 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:41.290 * Marking node 7e443986124535b40885896a5d14e3f345f28c40 as failing (quorum reached).
9:M 20 May 2020 13:54:41.290 # Cluster state changed: fail
9:M 20 May 2020 13:54:41.623 # <module> On ExecutionPlan_NotifyReceived, execution aborted

==> 30003.log <==
13:M 20 May 2020 13:54:41.290 * FAIL message received from 6fc86dc7c8184b038b3680d0813edf76aa453e3c about 7e443986124535b40885896a5d14e3f345f28c40
13:M 20 May 2020 13:54:41.290 # Cluster state changed: fail

==> 30001.log <==
9:M 20 May 2020 13:54:45.640 * Clear FAIL state for node 7e443986124535b40885896a5d14e3f345f28c40: is reachable again and nobody is serving its slots after some time.
9:M 20 May 2020 13:54:45.641 # Cluster state changed: ok
9:M 20 May 2020 13:54:45.641 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:49.369 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:52.765 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:55.637 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:57.352 * Marking node effdf1f49518b3f80cdf91948522eab1d099d1c0 as failing (quorum reached).
9:M 20 May 2020 13:54:57.352 # Cluster state changed: fail

==> 30002.log <==
11:M 20 May 2020 13:54:57.352 * FAIL message received from 6fc86dc7c8184b038b3680d0813edf76aa453e3c about effdf1f49518b3f80cdf91948522eab1d099d1c0
11:M 20 May 2020 13:54:57.352 # Cluster state changed: fail

==> 30003.log <==
13:M 20 May 2020 13:54:57.732 * Clear FAIL state for node 7e443986124535b40885896a5d14e3f345f28c40: is reachable again and nobody is serving its slots after some time.
13:M 20 May 2020 13:54:57.732 # Cluster state changed: ok

==> 30001.log <==
9:M 20 May 2020 13:55:01.669 * Clear FAIL state for node effdf1f49518b3f80cdf91948522eab1d099d1c0: is reachable again and nobody is serving its slots after some time.
9:M 20 May 2020 13:55:01.669 # Cluster state changed: ok

==> 30002.log <==
11:M 20 May 2020 13:55:01.657 * Clear FAIL state for node effdf1f49518b3f80cdf91948522eab1d099d1c0: is reachable again and nobody is serving its slots after some time.
11:M 20 May 2020 13:55:01.657 # Cluster state changed: ok

@meirsh if you don’t want to download kaggle dataset sample data inside sample_data folder.
I am checking if it will be same behaviour on current edge single Redis instance docker.

Checking, will reply soon.

@meirsh I checked with single instance:

docker pull redislabs/redismod:edge
docker run -p 6379:6379 redislabs/redismod:edge

code works as expected - pipeline runs until spellchecker. It seems to be killer only for RedisGears cluster.

@AlexMikhalev so seems like there is couple of issues here, some of them are in the gear function itself but one is a real RedisGears issue (which is a funny thing because I was currently working on fixing it). Its going to be a long reply but I hope I will be able to explain it good enough. And yes basically you are right its a cluster issue and it will not happened with one shard. So here is the explanation:

Lets call the shard that you executed the RG.PYEXECUTE on the initiator. The initiator get the RG.PYEXECUTE command with the requirement list, download and install them (before running the function). Then it runs the function and link the requirements to everything that function creates (registrations, runs, …). Now it needs to send the registrations to all the other shards. In order to do it, it serialize the registration with the requirements and all the steps (python function) and send it to all the shards. The serialized registration here is huge, not just because the requirements are very big but because it also serialize the ‘nlp’ object (because it used by ‘parse_paragraphs’ function) which is by itself huge. Till here everything is ok, its should only happened once so its not a big deal. The issue is that its not happening only once… it happened each time this registration fires, why? because the registration mode is async which means that it distributed across all the shards. So each time you write a key, Gear serialize and deserialize the registration to a buffer and memory quickly explodes.

Now, I agree that requirements should not have been serialized and deserialized each time, as well as the execution itself, only the data its working on (like the key for example) should have been sent. I am currently working on fixing it for the next version. But until then I am going to suggest a “workaround” (which I also believe is the preferred way to do it regarding the issue I describe). The “workaround” I will suggest will trigger a local execution on the shard that got the event (without creating global execution which require initialization time in order to send it to all the shards).

The first thing I would suggest is to initialize the ‘nlp’ on each shard separately. It can be done by using the ‘onRegistered’ callback when register the function. this callback will be called on each shard upon registration (https://oss.redislabs.com/redisgears/functions.html#register). The second thing I would like to suggest is that each shard will write to a local keys by using the {} and ‘hashtag’ function. And the last thing I am going to suggest is to change the execution mode to ‘async_local’. It will look something like this:

nlp = None

def OnRegistered():
    global nlp
    import spacy
    nlp=spacy.load('en_core_web_md', disable=['ner','tagger'])
    nlp.max_length=2000000

def remove_prefix(text, prefix):
    return text[text.startswith(prefix) and len(prefix):]

def parse_paragraphs(x):
    global nlp
    key_prefix="en:paragraphs:"
    #make sure we only process english article
    paragraphs =x['value']
    key = x['key']
    doc=nlp(paragraphs)
    idx=1
    article_id=remove_prefix(key,key_prefix)
    for each_sent in doc.sents:
        sentence_key="sentences{%s}:%s:%s" % (key, article_id, idx)
        execute('SET', sentence_key, each_sent)
        idx+=1
        execute('SADD','processed_docs_stage2_sentence{%s}' % hashtag(), article_id)
        log("Successfully processed paragraphs "+str(article_id),level='notice')
    else:
        execute('SADD','screw_ups{%s}' % hashtag(), x['key'])
    

GB().\
foreach(parse_paragraphs).\
count().\
register('en:paragraphs:*', keyTypes=['string'], onRegistered=OnRegistered, mode="async_local")

Now each time a key will be written to a shard it will be processed locally on the shard. On failure it will be written to a local shard key ‘screw_ups{< string that match to shard hslot >}’ and on success it will be written to ‘processed_docs_stage2_sentence{< same string that matches shard hslot >}’. Also for each key more keys will be create with the prefix ‘sentences{ < the original key > }’ and we will know they are sitting correctly on the shards hslot mapping again thanks to the ‘{}’.

Now you can use batch gear execution to collect all the ‘processed_docs_stage2_sentence{…}’ keys and analyze them. Or maybe get all the ‘screw_ups’. And you can easily get all the ‘sentences’ created for a given key.

One last thing to notice, Gears will run your function in the background one by one for each event. If you will write your data faster then the time it takes to process this buffer will increase (You can see it increases with RG.DUMPREGISTRATIONS and see the amount of trigger executions and the amount of completed execution, the diff is the backlog :slight_smile: ). If you have picks then its fine because Gear will keep up the pace after the peek end. But if you always write faster this buffer will continue to grow. One way to avoid it is setting the execution mode to ‘sync’ which mean that it will run at the same thread as the write itself and you will only get the write reply after the processing has finished. If you chose this approach you will know for sure this backlog is not growing but you will also increase the reply latency. This is a tradeoff that you will have to chose depends on the full use-case.

Hope its clear. Please update us about the progress and if you have any other questions/issues.

Thank you @meirsh, I will try your recommendation. Serialisation/deserialisation of nlp and another similar object (scispacy) which is what I was fighting using dask.distributed and joblib.
I will rewrite using your sample - my next step in process have even bigger memory requirements using transformers BERT tokeniser and I just manage to blow away single instance feeding the same pipeline.

@AlexMikhalev Notice that you can put the initialization of all those objects in the onRegistered callback so they will not be serialized/deserialized.

Yes, thank, doing it now.

1 Like

@meirsh I managed to get this step working at least twice.
I am struggling to understand the role of {}:

and I didn’t find any mention in the documents.
Can you point me in the right direction?