Insert data with older timestamp

Hi all,

I would like to use the RedisTimeSeries as a cache for serving interactive time series charts.

Sometimes I may need to insert data with timestamp lesser than the older one, in a given serie, but I get the following exception:

TSDB: Timestamp cannot be older than the latest timestamp in the time series

Is there anyway to overcome this?

Thanks in advance, kind regards

Andrea

Hi Andrea,
Thanks for using RedisTimeSeries.
We do not support this at this time, you can overcome this by reinserting all the data with the old sample.

Cheers,
Danni.

Hello Andrea,

RedisTimeSeries does not allow inserting additional timestamps in the past.

We could potentially support changing the value of an existing timestamp. Would that be sufficient for your use case?

regards,

Ariel

Hi Ariel,

I am using RedisTimeSeries as a cache for interactive charts: sometimes it is necessary to fetch older data into the cache, for a given time serie.

After using RedisTimeSeries for a while, I realised that I can manage my use case using different keys, adding start and end timestamps as the part of the serie’s key (so the keys are unique for each fetch from the main database) and using labels to filter out the series I have to serve.

This trick also helps me to figure out when fetching data from the slower on disk database is required. Then, using “mrange” and filtering by labels, I am able to address my use case.

Now, I have a question: is it possible to modify “mrange” so that it processes and aggregates different keys as a whole?

Thanks in advance,

Andrea

I have added a “amrange” (that means “query multiple ranges all at once”) function to the module, in order to show my idea:

int TSDB_amrange(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {

char *currentKey;

size_t currentKeyLen;

long long replylen;

timestamp_t last_agg_timestamp;

Series *series;

void *context;

Sample sample;

const char *description = “Dummy\0”; // Put a dummy description to keep the same interface with the python redistimeseries module.

RedisModule_AutoMemory(ctx);

if (argc < 4) {

return RedisModule_WrongArity(ctx);

}

api_timestamp_t start_ts, end_ts;

api_timestamp_t time_delta = 0;

Series fake_series = {0};

fake_series.lastTimestamp = LLONG_MAX;

if (parseRangeArguments(ctx, &fake_series, 1, argv, &start_ts, &end_ts) != REDISMODULE_OK) {

return REDISMODULE_ERR;

}

AggregationClass *aggObject = NULL;

int aggregationResult = parseAggregationArgs(ctx, argv, argc, &time_delta, &aggObject);

if (aggregationResult == TSDB_ERROR) {

return REDISMODULE_ERR;

}

int filter_location = RMUtil_ArgIndex(“FILTER”, argv, argc);

if (filter_location == -1) {

return RedisModule_WrongArity(ctx);

}

long long count = -1;

if (parseCountArgument(ctx, argv, argc, &count) != REDISMODULE_OK) {

return REDISMODULE_ERR;

}

size_t query_count = argc - 1 - filter_location;

QueryPredicate *queries = RedisModule_PoolAlloc(ctx, sizeof(QueryPredicate) * query_count);

if (parseLabelListFromArgs(ctx, argv, filter_location + 1, query_count, queries) == TSDB_ERROR) {

return RedisModule_ReplyWithError(ctx, “TSDB: failed parsing labels”);

}

if (CountPredicateType(queries, (size_t) query_count, EQ) == 0) {

return RedisModule_ReplyWithError(ctx, “TSDB: please provide at least one matcher”);

}

RedisModuleDict *result = QueryIndex(ctx, queries, query_count);

replylen = 0;

last_agg_timestamp = 0;

context = NULL;

RedisModule_ReplyWithArray(ctx, 1);

RedisModule_ReplyWithArray(ctx, 3);

RedisModule_ReplyWithStringBuffer(ctx, description, strlen(description));

RedisModule_ReplyWithArray(ctx, 1);

RedisModule_ReplyWithArray(ctx, 2);

RedisModule_ReplyWithStringBuffer(ctx, description, strlen(description));

RedisModule_ReplyWithStringBuffer(ctx, description, strlen(description));

RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);

RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(result, “^”, NULL, 0);

if (aggObject != NULL)

context = aggObject->createContext();

while((currentKey = RedisModule_DictNextC(iter, &currentKeyLen, NULL)) != NULL) {

RedisModuleKey *key = RedisModule_OpenKey(ctx, RedisModule_CreateString(ctx, currentKey, currentKeyLen),REDISMODULE_READ);

if (key == NULL || RedisModule_ModuleTypeGetType(key) != SeriesType){

RedisModule_Log(ctx, “warning”, “couldn’t open key or key is not a Timeseries. key=%s”, currentKey);

continue;

}

series = RedisModule_ModuleTypeGetValue(key);

// In case a retention is set shouldn’t return chunks older than the retention

if(series->retentionTime){

start_ts = series->lastTimestamp > series->retentionTime ?

max(start_ts, series->lastTimestamp - series->retentionTime) : start_ts;

}

SeriesIterator iterator = SeriesQuery(series, start_ts, end_ts);

while (SeriesIteratorGetNext(&iterator, &sample) != 0 && (count == -1 || replylen < count)) {

if (aggObject == NULL) { // No aggregation whatssoever

RedisModule_ReplyWithArray(ctx, 2);

RedisModule_ReplyWithLongLong(ctx, sample.timestamp);

RedisModule_ReplyWithDouble(ctx, sample.data);

replylen++;

} else {

timestamp_t current_timestamp = sample.timestamp - (sample.timestamp % time_delta);

if (current_timestamp > last_agg_timestamp) {

if (last_agg_timestamp != 0) {

ReplyWithAggValue(ctx, last_agg_timestamp, aggObject, context);

replylen++;

}

last_agg_timestamp = current_timestamp;

}

aggObject->appendValue(context, sample.data);

}

}

SeriesIteratorClose(&iterator);

}

if (aggObject != AGG_NONE && replylen != count) {

// reply last bucket of data

ReplyWithAggValue(ctx, last_agg_timestamp, aggObject, context);

replylen++;

}

RedisModule_DictIteratorStop(iter);

RedisModule_ReplySetArrayLength(ctx, replylen);

return REDISMODULE_OK;

}

Nice works! This is great Andrea. A very useful function. Our team is discussing it and may have some feedback. In the meantime, keep it up and let us know how it is going.

Dave

Thanks, Dave! I hope this feature will be pulled into the module. Will let you know how it goes.

Andrea