Giter Club home page Giter Club logo

expirationd's Introduction

Run tests Coverage Status

expirationd - data expiration with custom quirks.

This package can turn Tarantool into a persistent memcache replacement, but is powerful enough so that your own expiration strategy can be defined.

You define two functions: one takes a tuple as an input and returns true in case it's expired and false otherwise. The other takes the tuple and performs the expiry itself: either deletes it (memcache), or does something smarter, like put a smaller representation of the data being deleted into some other space.

There are a number of similar modules:

  • moonwalker triggered manually, useful for batch transactions, a performance about 600/700k rec/sec
  • expirationd always expires tuples with using indices and using any condition, without guarantee for time expiration.
  • indexpirationd always expires tuples with indices, has a nice precision (up to ms) for time to expire.

Table below may help you to choose a proper module for your requirements:

Module Reaction time Uses indices Arbitrary condition Expiration trigger
indexpiration High (ms) Yes No synchronous (fiber with condition)
expirationd Medium (sec) Yes Yes synchronous (fiber with condition)
moonwalker NA No Yes asynchronous (using crontab etc)

Prerequisites

Installation

You can:

  • Install the module using tt:

    tt rocks install expirationd
  • Install the module using LuaRocks:

    luarocks install --local --server=https://rocks.tarantool.org expirationd

Documentation

See API documentation in https://tarantool.github.io/expirationd/

Note about using expirationd with replication: by default expirationd processes tasks for all types of spaces only on the writable instance. It does not process tasks on read-only instance for non-local persistent spaces. It means that expirationd will not start task processing on a replica for regular spaces. One can force running task on replica with option force in start() module function. The option force let a user control where to start task processing and where don't.

Examples

Simple version:

box.cfg{}
space = box.space.old
job_name = "clean_all"
expirationd = require("expirationd")

function is_expired(args, tuple)
  return true
end

function delete_tuple(space, args, tuple)
  box.space[space]:delete{tuple[1]}
end

expirationd.start(job_name, space.id, is_expired, {
    process_expired_tuple = delete_tuple,
    args = nil,
    tuples_per_iteration = 50,
    full_scan_time = 3600
})

Сustomized version:

expirationd.start(job_name, space.id, is_expired, {
    -- name or id of the index in the specified space to iterate over
    index = "exp",
    -- one transaction per batch
    -- default is false
    atomic_iteration = true,
    -- delete data that was added a year ago
    -- default is nil
    start_key = function( task )
        return clock.time() - (365*24*60*60)
    end,
    -- delete it from the oldest to the newest
    -- default is ALL
    iterator_type = "GE",
    -- stop full_scan if delete a lot
    -- returns true by default
    process_while = function( task )
        if task.args.max_expired_tuples >= task.expired_tuples_count then
            task.expired_tuples_count = 0
            return false
        end
        return true
    end,
    -- this function must return an iterator over the tuples
    iterate_with = function( task )
        return task.index:pairs({ task.start_key() }, { iterator = task.iterator_type })
            :take_while( function( tuple )
                return task:process_while()
            end )
    end,
    args = {
        max_expired_tuples = 1000
    }
})

Testing

$ make deps-full
$ make test

Regression tests running in continuous integration that uses luatest are executed in shuffle mode. It means that every time order of tests is pseudorandom with predefined seed. If tests in CI are failed it is better to reproduce these failures with the same seed:

$ make SEED=1334 test
luatest -v --coverage --shuffle all:1334
...

Cartridge role

cartridge.roles.expirationd is a Tarantool Cartridge role for the expirationd package with features:

  • It registers expirationd as a Tarantool Cartridge service for easy access to all API calls:

    local task = cartridge.service_get('expirationd').start("task_name", id, is_expired)
    task:kill()
  • You could configure the expirationd role with cfg entry. expirationd.cfg() has the same parameters with the same meaning.

    Be careful, values from the clusterwide configuration are applied by default to all nodes on each apply_config(). Changing the configuration manually with expirationd.cfg() only affects the current node and does not update values in the clusterwide configuration. The manual change will be overwritten by a next apply_config call.

  • You can use persistent functions (i.e. created by box.schema.func.create). When configuring, role tries firstly get function from global namespace (_G) and if function was not found then role tries search in box.func for function with the same name.

    Be careful! At the moment of validating and applying config of expirationd role all persistent functions must be created before, so to configure cartridge application correctly you must do it in two steps: at the first step you have to confgure migrations with creating persistent functions and run them, at the second one put expirationd config.

  • The role stops all expirationd tasks on an instance on the role termination.

  • The role can automatically start or kill old tasks from the role configuration:

    expirationd:
      cfg:
        metrics: true
      task_name1:
        space: 579
        is_expired: is_expired_func_name_in__G
        is_master_only: true
        options:
          args:
            - any
          atomic_iteration: false
          force: false
          force_allow_functional_index: true
          full_scan_delay: 1
          full_scan_time: 1
          index: 0
          iterate_with: iterate_with_func_name_in__G
          iteration_delay: 1
          iterator_type: ALL
          on_full_scan_complete: on_full_scan_complete_func_name_in__G
          on_full_scan_error: on_full_scan_error_func_name_in__G
          on_full_scan_start: on_full_scan_start_func_name_in__G
          on_full_scan_success: on_full_scan_success_func_name_in__G
          process_expired_tuple: process_expired_tuple_func_name_in__G
          process_while: process_while_func_name_in__G
          start_key:
          - 1
          tuples_per_iteration: 100
          vinyl_assumed_space_len: 100
          vinyl_assumed_space_len_factor: 1
      task_name2:
        ...

    expirationd.start() has the same parameters with the same meaning except for the additional optional param is_master_only. If true, the task should run only on a master instance. By default, the value is false.

    You need to be careful with parameters-functions. The string is a key in the global variable _G, the value must be a function. You need to define the key before initializing the role:

    rawset(_G, "is_expired_func_name_in__G", function(args, tuple)
        -- code of the function
    end)

Tarantool 3.0 role

roles.expirationd is a Tarantool 3.0 role for the expirationd package with the following features:

  • You can configure the expirationd role with cfg entry (check example). Cluster configuration allows to set the same parameters as in expirationd.cfg()

  • You can use persistent functions (i.e. created by box.schema.func.create) for expirationd cfg entries. When configuring, role tries first to get a function from global namespace (_G) and if the function was not found then role tries to search in box.func for a function with the same name. If some functions from config are missing, expirationd will wait for their creation and start tasks when all of them are found. You can check logs to see what functions are missing.

  • The role stops all expirationd tasks on an instance on the role termination.

  • The role can automatically start or kill old tasks from the role configuration.

    roles: [roles.expirationd]
    roles_cfg:
      roles.expirationd:
        cfg:
          metrics: true
        task_name1:
          space: users
          is_expired: is_expired_func_name
          is_master_only: true
          options:
            args:
              - any
            atomic_iteration: false
            force: false
            force_allow_functional_index: true
            full_scan_delay: 1
            full_scan_time: 1
            index: 0
            iterate_with: iterate_with_func_name_in__G
            iteration_delay: 1
            iterator_type: ALL
            on_full_scan_complete: on_full_scan_complete_func_name_in__G
            on_full_scan_error: on_full_scan_error_func_name_in__G
            on_full_scan_start: on_full_scan_start_func_name_in__G
            on_full_scan_success: on_full_scan_success_func_name_in__G
            process_expired_tuple: process_expired_tuple_func_name_in__G
            process_while: process_while_func_name_in__G
            start_key:
              - 1
            tuples_per_iteration: 100
            vinyl_assumed_space_len: 100
            vinyl_assumed_space_len_factor: 1

    expirationd.start() has the same parameters with the same meaning except for the additional optional param is_master_only. If true, the task should run only on a master instance. By default, the value is false.

    You need to be careful with function parameters. Task will not start until it finds all functions from config. You can define them in user code:

    box.schema.func.create('is_expired_func_name', {
        body = "function(...) return true end",
        if_not_exists = true
    })
    
    -- Or you could define a global variable.
    rawset(_G, "process_while_func_name_in__G", function(...)
        return true
    end)

expirationd's People

Contributors

0x501d avatar aak74 avatar amdrozdov avatar andreyaksenov avatar artdu avatar better0fdead avatar bigbes avatar differentialorange avatar fizikroot avatar grishnov avatar ilmarkov avatar kostja avatar leonidvas avatar ligurio avatar mkostoevr avatar nekipelov avatar oleg-jukovec avatar opomuc avatar rtsisyk avatar stefansaraev avatar sudobobo avatar totktonada avatar ylobankov avatar zaglex avatar zloidemon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

expirationd's Issues

master-master mode

Currently expirationd doesn't work if it detects that it's running on a slave. This is obsolete, since the server can be in master-master replication mode. Either it should check for something else (i.e. find out whether or not the instance it's currently running on is in read-only mode), or there should be an explicit flag to make sure expirationd works fine in master-master mode.

get_fiber_id is broken

The guardian loop is broken, since get_fiber_id can throw an error if the fiber is dead.
fiber:id() for dead fibers doesn't work starting from Tarantool 1.4 I think.

If a fiber dies before guardian loop checks its status, the guardian itself can die with an exception.

The same bug is present in 1.5 version at http://github.com/mailru/tntlua

Use fiber:status() to find out fiber status, it doesn't throw.

A test case for zombie tasks occasionally fails

http://build.tarantool.org/job/Tarantool/job/expirationd/job/master/8/consoleFull#-972075579bf66fc16-3e06-4e97-85eb-3b8735628d03

[debian-jessie] INDEX_TYPE='TREE' prove -v ./test.lua
[debian-stretch] ./test.lua .. 
[debian-stretch] TAP version 13
[debian-stretch] 1..8
[debian-stretch]     # simple expires test
[debian-stretch]     1..4
[debian-stretch]     ok - nil
[debian-stretch]     ok - nil
[debian-stretch]     ok - nil
[debian-stretch]     ok - Test task executed and moved to archive
[debian-stretch]     # simple expires test: end
[debian-stretch] ok - simple expires test
[debian-stretch]     # execution error test
[debian-stretch]     1..2
[debian-stretch]     ok - nil
[debian-stretch]     ok - Error task executed
[debian-stretch]     # execution error test: end
[debian-stretch] ok - execution error test
[debian-stretch]     # not expired task
[debian-stretch]     1..2
[debian-stretch]     ok - nil
[debian-stretch]     ok - nil
[debian-stretch]     # not expired task: end
[debian-stretch] ok - not expired task
[debian-stretch]     # zombie task kill
[debian-stretch]     1..4
[debian-stretch]     ok - nil
[debian-stretch]     ok - nil
[debian-stretch]     ok - nil
[debian-stretch]     not ok - Zombie task was killed and restarted
[debian-stretch]       ---
[debian-stretch]       filename: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]       trace:
[debian-stretch]       - line: 316
[debian-stretch]         source: '@/build/tarantool-expirationd-1.0.0.8/./test.lua'
[debian-stretch]         filename: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]         what: Lua
[debian-stretch]         namewhat: local
[debian-stretch]         name: fun
[debian-stretch]         src: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]       - line: 210
[debian-stretch]         source: '@builtin/tap.lua'
[debian-stretch]         filename: builtin/tap.lua
[debian-stretch]         what: Lua
[debian-stretch]         namewhat: method
[debian-stretch]         name: test
[debian-stretch]         src: builtin/tap.lua
[debian-stretch]       - line: 0
[debian-stretch]         source: '@/build/tarantool-expirationd-1.0.0.8/./test.lua'
[debian-stretch]         filename: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]         what: main
[debian-stretch]         namewhat: 
[debian-stretch]         src: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]       line: 0
[debian-stretch]       expected: dead
[debian-stretch]       got: suspended
[debian-stretch]       ...
[debian-stretch]     # zombie task kill: end
[debian-stretch] not ok - failed subtests
[debian-stretch]   ---
[debian-stretch]   filename: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]   trace:
[debian-stretch]   - line: 236
[debian-stretch]     source: '@builtin/tap.lua'
[debian-stretch]     filename: builtin/tap.lua
[debian-stretch]     what: Lua
[debian-stretch]     namewhat: method
[debian-stretch]     name: test
[debian-stretch]     src: builtin/tap.lua
[debian-stretch]   - line: 0
[debian-stretch]     source: '@/build/tarantool-expirationd-1.0.0.8/./test.lua'
[debian-stretch]     filename: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]     what: main
[debian-stretch]     namewhat: 
[debian-stretch]     src: /build/tarantool-expirationd-1.0.0.8/./test.lua
[debian-stretch]   planned: 4
[debian-stretch]   failed: 1
[debian-stretch]   line: 0
[debian-stretch]   ...
[debian-stretch]     # multiple expires test
[debian-stretch]     1..2
[debian-stretch]     ok - First part expires done
[debian-stretch]     ok - Multiple expires done
[debian-stretch]     # multiple expires test: end
[debian-stretch] ok - multiple expires test
[debian-stretch]     # default drop function test
[debian-stretch]     1..2
[debian-stretch]     ok - tuples are in space
[debian-stretch]     ok - all tuples are expired with default function
[debian-stretch]     # default drop function test: end
[debian-stretch] ok - default drop function test
[debian-stretch]     # restart test
[debian-stretch]     1..3
[debian-stretch]     ok - new expirationd table
[debian-stretch]     ok - tuples are in space
[debian-stretch]     ok - all tuples are expired
[debian-stretch]     # restart test: end
[debian-stretch] ok - restart test
[debian-stretch]     # complex key test
[debian-stretch]     1..2
[debian-stretch]     ok - tuples are in space
[debian-stretch]     ok - all tuples are expired with default function
[debian-stretch]     # complex key test: end
[debian-stretch] ok - complex key test
[debian-stretch] # failed subtest: 1
[debian-stretch] Dubious, test returned 1 (wstat 256, 0x100)
[debian-stretch] Failed 1/8 subtests 

expirationd not doing anything

I try to use expirationd at my tarantool instance.
I add this code to my lua file:
`local function is_expired(args, tuple)
-- logic for check
local x = os.time()
log.debug("check: " .. tuple[1] .. " " .. tuple[3] .. " " .. x)
if (tuple[3] <= x) or (tuple[4] <= x) then
return true
else
return false
end
end

local function delete_tuple(space_id, args, tuple)
log.debug("delete from: " .. space_id .. " " .. tuple[1])
box.space[space_id]:delete{tuple[1]}
end

for k, v in pairs(config.ttl_spaces) do
log.debug("ttl_space: " .. v .. " " .. type(delete_tuple))
expirationd.start(job_clean .. v, box.space[v].id, is_expired, {
process_expired_tuple = delete_tuple, args = nil,
tuples_per_iteration = 500, full_scan_time = 60
})
expirationd.update()
log.debug(job_clean .. v .. " was started")
end`

After that restart instance, but nothing happend.
At stats I see that not any raw was checked:

tarantuldb-tst1:3301> expirationd.stats('clean_getDictionaries')

  • expired_count: 0
    working_time: 433
    restarts: 1
    checked_count: 0
    ...

tarantuldb-tst1:3301> expirationd.stats('clean_getDictionaries')

  • expired_count: 0
    working_time: 500
    restarts: 1
    checked_count: 0
    ...

Why?

Infinity increase of fiber counter

If we drop a space that has an expirationd task, then it will endlessly create fibers. I am not claiming that it will be a fiber bomb, since fibers die immediately due to the lack of space error. But the fiber counter is increasing exactly

image

The user must control the existence of the space and the expirationd tasks himself, but the code is best made as safe as possible

Potential Bug: CPU Usage Increase

Problem

Allegedly, there is a bug, which causes CPU usage increase.
Apparently, no sleeping occurs inside infinite loop in a worker fiber.

Possible Reproduction Scenario

There is less tuples in space than it's configured to process.

Some Comments

I will attach minimal test case for reproducing the behaviour later.
For now, this is just a suspicion.
Feel free to close the issue if no activity happens for a long time or contact me and school on my laziness and lack of responsibility.

Global var and function

In expirationd.lua there are suspicious global function: 'expirationd_run_task_obsolete', and global variable: 'retval' in expirationd_task_stats. I suppose they should be made local.

Unclear options

  • on_full_scan_success - Function to call after successfully completing a full scan iteration.
  • on_full_scan_error - Function to call after terminating a full scan due to an error.
  • args - Passed to is_tuple_expired and process_expired_tuple() as additional context.
  • full_scan_time - Time required for a full index scan (in seconds).

This option is unclear -- does it allow to stop scanning after the specified period of time?

#52 (comment)

Add test for reloading the module with external iteration state via Cartridge hotreload

Use case:

A user creates an expiration task, for which the expiration check callback or the iteration function saves some state into the globals or cartridge.vars (e.g. iterator state or the last tuple value, etc). In case of reloading the module in a Cartridge role, the user must have the ability to preserve the state for continuing the iteration with it after the reload.

AC:

New integration test added for the above scenario and is passing in the CI

Truncate long job names

If the user passes a long job name, they will get:

 LuajitError: /usr/local/share/lua/5.1/expirationd.lua:145: Fiber name is too long

Simplify source code with checking function arguments

In many places in expirationd.lua there are checks like below:

    if options.vinyl_assumed_space_len ~= nil then                               
        if type(options.vinyl_assumed_space_len) ~= 'number' then                
            error("Invalid type of vinyl_assumed_space_len value")               
        end                                                                      
        task.vinyl_assumed_space_len = options.vinyl_assumed_space_len           
    end  

it can be simplified with checks module that checks types of function arguments:

local options_types = {
    ...
    'number', -- options.vinyl_assumed_space_len
    ...
}

local function expirationd_run_task(name, space_id, is_tuple_expired, options)
checks('string', 'number', 'function', option_types)
task.vinyl_assumed_space_len = options.vinyl_assumed_space_len  

Need explain `full_scan_time` more detailed

Now in readme no such explanations of what is full_scan_time, iteration_delay, full_scan_delay.

  1. "full_scan_time - Time required for a full index scan (in seconds)." - is it the max time spend for full scan ? or this time must be spent to scan?
  2. "iteration_delay - Max sleep time between batches (in seconds)." - why max? Can batch awake ahead of time?
  3. "full_scan_delay - Sleep time between full scans (in seconds)." - full scans maddening me. how does it works?

Refine master-master mode support

Initially the module leant on box.cfg.replication_source (box.cfg.replication for tarantool-1.7.6+) to don't start task processing on a replica. Here the word replica means an instance with at least one configured upstream.

Then something in the world changes and it becomes usual to use bidirectional replication with tarantool. Most of times it is not a real master-master: all writes are going to a single instance at given moment of the time. But it allows to skip replication reconfiguration step when switching an instance that aimed to process writes. This way decreases a time for the switching and decreases probability of mistakes during replica set reconfiguration. box.cfg{read_only = true|false} is often used to protect instances, where writes should not be performed.

In this confguration the expirationd module should process tasks only on the writeable instance and so checking of a box.cfg.replication is not sufficient. The option force was added with this idea in the mind. It let a user control where to start task processing and where don't. However it obligates an application developer to externally stop tasks processing on an instance that goes to read-only and start it on an instance that goes to be writeable. It is not quite obvious.

This ideal solution would be ability to configure expirationd to obtain information about an instance state. Say, 'stop processing when box.info.ro becomes true and start it when it becomes false'.

I propose the following API:

local expirationd = require('expirationd')

expirationd.start(name, space_id, is_tuple_expired, {
    <...>
    when = <function>,  -- process tuples only when this condition is true;
                        -- this may be a user-defined function or a predefined one:
                        -- * `expirationd.has_no_upstreams` (it is default)
                        -- * `expirationd.is_writeable` (usually good for master-master setup)
    force = <boolean>,  -- process tuples despite `opts.when` value
})

--
-- Built-in functions to use in `opts.when`.
--
-- Implementations are listed here to make the idea more clear.
--

expirationd.has_no_upstreams = function()
    -- `replication_source` is the old name of `replication` parameter (prior to 1.7.6).
    -- `box.cfg{replication = {}}` may be used to withdraw from upstreams.
    local upstreams = box.cfg.replication or box.cfg.replication_source
    return upstreams == nil or (type(upstreams) == 'table' and next(upstreams) == nil)
end

expirationd.is_writeable = function()
    -- Not sure whether `box.info.ro` can be `nil` during box configuration.
    -- Consider `nil` as 'read-only' just in case.
    return box.info.ro == false
end

The discussion was started in PR #37 by @filonenko-mikhail, then was stale for a long time. I filed the issue to discuss and finally decide what would be most convenient way to configure the module in a multi-instance environment.

regression testing with vinyl is broken

In testing with all Tarantool versions in GH Actions I don't see output for TAP test cases and I see only this:

$ INDEX_TYPE='TREE' SPACE_TYPE='vinyl' ./test.lua
TAP version 13
$ cat tarantool.log 
2021-09-10 19:18:52.199 [682718] main/103/test.lua C> Tarantool 2.8.2-3-gf516c93d6
2021-09-10 19:18:52.199 [682718] main/103/test.lua C> log level 5
2021-09-10 19:18:52.199 [682718] main/103/test.lua I> wal/engine cleanup is paused
2021-09-10 19:18:52.199 [682718] main/103/test.lua I> mapping 268435456 bytes for memtx tuple arena...
2021-09-10 19:18:52.200 [682718] main/103/test.lua I> Actual slab_alloc_factor calculated on the basis of desired slab_alloc_factor = 1.044274
2021-09-10 19:18:52.200 [682718] main/103/test.lua I> mapping 134217728 bytes for vinyl tuple arena...
2021-09-10 19:18:52.200 [682718] main/103/test.lua I> instance uuid 62872693-7c9a-4630-ac3a-4b9ef423e5d7
2021-09-10 19:18:52.201 [682718] main/103/test.lua I> tx_binary: stopped
2021-09-10 19:18:52.201 [682718] main/103/test.lua I> initializing an empty data directory
2021-09-10 19:18:52.207 [682718] main/103/test.lua I> assigned id 1 to replica 62872693-7c9a-4630-ac3a-4b9ef423e5d7
2021-09-10 19:18:52.207 [682718] main/103/test.lua I> cluster uuid 08fe6c8f-2028-4d91-9930-7574f8fa9b8f
2021-09-10 19:18:52.207 [682718] snapshot/101/main I> saving snapshot `./00000000000000000000.snap.inprogress'
2021-09-10 19:18:52.216 [682718] snapshot/101/main I> done
2021-09-10 19:18:52.217 [682718] main/103/test.lua I> ready to accept requests
2021-09-10 19:18:52.217 [682718] main/104/gc I> wal/engine cleanup is resumed
2021-09-10 19:18:52.217 [682718] main/103/test.lua I> set 'log_level' configuration option to 5
2021-09-10 19:18:52.217 [682718] main/105/checkpoint_daemon I> scheduled next checkpoint for Fri Sep 10 20:47:40 2021
2021-09-10 19:18:52.217 [682718] main/103/test.lua I> set 'log_format' configuration option to "plain"
2021-09-10 19:18:52.218 [682718] main I> tx_binary: stopped
$ 

TODO: Fix file masks in CLEANUP_FILES (see #71 (comment))

Debian package

Hi!

I found deb package on Ubuntu trusty with version=1.0.0-6
But in this project I see only 1.0.0-1

$ apt-cache policy tarantool-expirationd 
tarantool-expirationd:
  Installed: 1.0.0-6
  Candidate: 1.0.0-6
  Version table:
 *** 1.0.0-6 0
        500 http://download.tarantool.org/tarantool/1.7/ubuntu/ trusty/main amd64 Packages
        100 /var/lib/dpkg/status

Is this repo alive?

RFC: flag for blocking one expectation task per space

Problem

We want to create a flag that will protect users using modules to protect them from the fool. It will be painful have too many full scans.

Needs

  • Make a flag that can be passed and it will prohibit the creation of another run task if another already exists
  • It is necessary to preserve backward compatibility

run_task: accept space id or name

run_task currently requires space id. While this is easy, it's a mental step. Accept space name just as well, resolve it to space id in create_task()

Investigate how to support the use-cases from indexpiration

Currently, expirationd does not support iteration over a (secondary) index (since the expriration check callback must be applied to each tuple from a space). This creates a situation where a full scan is applied instead of selecting tuples from some start index value.

TODO:

  • Investigate how indices are used in the current expirationd implementation (check how the primary index is used, is it possible to iterate over a complex primary index, simple or complex secondary index)
  • Propose a RFC for changing the API to support iterating over a specified index or a field (the minimal satisfying index must be chosen in this case if it exists)
  • Check how the module is initialized: is it possible to reload the configuration and any created fibers or the internal state at runtime, thus supporting the "hot reload" feature
  • Propose a RFC for changes to support the "hot reload" feature

Does not work with sophia tree multipart index

2016-04-26 18:52:57.818 [10669] main/503/garage_infos_expiration sophia_index.cc:740 E> ER_UNSUPPORTED_INDEX_FEATURE: Index 'user_id_sid_rand' (TREE) of space 'garage_infos' (sophia) does not support partial keys

Space configuration:

local s = box.schema.create_space('sample_space', {                     
    engine = 'sophia',                                                  
    if_not_exists = true,                                               
})                                                                      
s:create_index('sample_index', {                                    
    type = 'tree',                                                      
    parts = {1, 'num', 2, 'str'},
    if_not_exists = true,                                               
})                                                                      

Pass scan iteration number to callbacks

Many tasks involve some kind of 'folding' a space, e.g. get minimum value of some field across space at some moment of time. To implement such things via expirationd we need to know when a new iteration of scanning space is started. Proposed to count number of scan and pass it to callbacks (is_tuple_expired and process_expired_tuple) as a parameter.

It worth to reuse scheduling and batching from expirationd instead of reinvent this for such tasks.

Wrong example in example.lua

Hi!
Example is not workable:

expirationd.start(job_name, space.id, is_expired, {
    process_expired_tuple = delete_tuple, args = nil,
    tuple_per_item = 50, full_scan_time = 3600
})

Workable:
expirationd.run_task(job_name, space.id, is_expired, delete_tuple, nil, 50, 3600)

May be lua experience is too small, but how this must work? Set of params are different
https://github.com/tarantool/expirationd/blob/master/expirationd.lua#L431
https://github.com/tarantool/expirationd/blob/master/expirationd.lua#L341
https://github.com/tarantool/expirationd/blob/master/expirationd.lua#L349

Vinyl is not supported

2016-08-11 09:55:53.016 [5423] main/103/guardian of "garage_infos" I> expiration: task "garage_infos" restarted
2016-08-11 09:55:53.029 [5423] main/132/garage_infos index.cc:188 E> ER_UNSUPPORTED_INDEX_FEATURE: Index 'user_id_sid_rand' (TREE) of space 'garage_infos' (vinyl) does not support size()

Calling expirationd.update() creates orphaned tasks

https://github.com/tarantool/expirationd/blob/master/expirationd.lua#L452

    local function expirationd_update()
    local expd_prev = require('expirationd')
    package.loaded['expirationd'] = nil
    local expd_new  = require('expirationd')
    for name, task in pairs(task_list) do
        task:kill()
        expd_new.start(
                task.name, task.space_id,

The function should probably call stop() instead of kill(), otherwise it gets removed from the task list and the newly started task instance is orphaned: calling expd_new.start(task.name again will create more identical fibers.

RFC: Adding Iterations over Secondary Indexes

Problem

We want to use secondary indexes to iterate over the space, as is done in indexpiration, but also use all expirationd features such as callback and hotreload.

Needs

  • Make it available to specify the index for iteration
  • It is necessary to preserve backward compatibility, so that if the index is not specified, then the iteration goes as it is now on the primary key

Research

Iterations over Secondary

Now there is a hardcode implementation of iterations by tree or hash index zero, i.e. by primary:

local tuples = scan_space.index[0]:select({}, params)
while #tuples > 0 do
last_id = tuples[#tuples]
for _, tuple in ipairs(tuples) do
expiration_process(task, tuple)
end
suspend(scan_space, task)
local key = construct_key(scan_space.id, last_id)
tuples = scan_space.index[0]:select(key, params)
end

The logic in indexepiration is now this, some field is used to remove by time (only the time or time64 types are available):
If the time value in the field is greater than zero, then we walk along it in the index:

for _,t in expire_index:pairs({0},{iterator = box.index.GT}) do
	if opts.kind == 'time' or opts.kind == 'time64' then
		if not typeeq(expire_index.parts[1].type,'num') then
			error(("Can't use field %s as %s"):format(opts.field,opts.kind),2+depth)
		end
		if opts.kind == 'time' then
			self.check = function(t)
				return t[ expire_field_no ] - clock.realtime()
			end
		elseif opts.kind == 'time64' then
			self.check = function(t)
				return tonumber(
					ffi.cast('int64_t',t[ expire_field_no ])
					- ffi.cast('int64_t',clock.realtime64())
				)/1e9
			end
		end
	elseif _callable(opts.kind) then
		self.check = opts.kind
	else
		error(("Unsupported kind: %s"):format(opts.kind),2+depth)
	end

In expirationd we can use the specified index from opts and perhaps we need to specify from which element to start the iteration like:

-- as noticed @akudiyar we need direction and stop iteration function
iterator = 'GE'
if not ascending then
  iterator = 'LE'
end
local params = {iterator = iterator, limit = task.tuples_per_iteration}
local tuples = scan_space.index.expire_index:select({start_element}, params)
while #tuples > 0 do
        last_id = tuples[#tuples]
        for _, tuple in ipairs(tuples) do
            if stop_iteration(task) then break end
            expiration_process(task, tuple)
        end
        suspend(scan_space, task)
        local key = construct_key(scan_space.id, last_id)
        -- select all greater then last key
        tuples = scan_space.index[0]:select(key, params)
    end

Taking the field?

Do we need this feature at all or just specify the index?
we need to think about how we will accept the field for such cases:

  • If we want to use a multipart index, do we accept a list of fields? It might be worthwhile to somehow accept the index directly without using the field. One field is passed to indexpiration
  • If we accepted only one field, but the index consists of several. In indexpiration, only the index is always taken, where the first part is our field, it doesn't matter if it's a multipart index or a single one
    for _, index in pairs(box.space[space_id].index) do
        -- we use only first part of index,
        -- because we will have problems with the starting element
        -- perhaps we should first of all take an index consisting only of our field
        if index.parts[1].fieldno == expire_field_no then
            expire_index = index
        end
    end

Accordingly, the starting element needs to be considered from the architectural point of view, after understanding how we will take field or fields. And of course the starting element and ascending cannot be used in the HASH index

Transactions

One transaction per batch.
There are no problems if we take into account the transaction per batch. We also need to consider if our function worked stop_iteration, the transaction should be completed.

    local tuples = task.expire_index:select(task.start_element, params)
    while #tuples > 0 do
        last_id = tuples[#tuples]
        if task.trx then
            box.begin()
        end
        for _, tuple in ipairs(tuples) do
            if task:stop_iteration() then
                if task.trx then
                    box.commit()
                end
                goto done
            end
            expiration_process(task, tuple)
        end
        if task.trx then
            box.commit()
        end
        suspend(task)
        local key = construct_key(task.expire_index, last_id)
        -- select all greater then last key
        tuples = task.expire_index:select(key, params)
    end
    ::done::

Pairs instead select in tree indexation

As noticed @olegrok #52 (comment) it's better to use pairs. For example now iterating over the hash index and done using pairs

expirationd/expirationd.lua

Lines 104 to 116 in 29d1a25

local function hash_index_iter(scan_space, task)
-- iteration for hash index
local checked_tuples_count = 0
for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do
checked_tuples_count = checked_tuples_count + 1
expiration_process(task, tuple)
-- find out if the worker can go to sleep
if checked_tuples_count >= task.tuples_per_iteration then
checked_tuples_count = 0
suspend(scan_space, task)
end
end
end

Proposed API

local format = {
        [1] = {name = "id", type = "string"},
        [2] = {name = "status", type = "string"},
        [3] = {name = "deadline", type = "number"},
        [4] = {name = "other", type = "number"},
    }
box.schema.space.create('to_expire', {
    format = format,
})

box.space.to_expire:create_index('primary', { unique = true, parts = {1, 'str'}, if_not_exists = true})
box.space.to_expire:create_index('exp', { unique = false, parts = { 3, 'number', 1, 'str' }, if_not_exists = true})

simple version

can use start_key instead of start_element?

expirationd.start("clean_all", box.space.to_expire.id, is_expired,
    {
        index = 'exp',
        trx   =  true,                                                        -- one transaction per batch
        start_element = function() return clock.time() - (365*24*60*60) end,  -- delete data that was added a year ago
        iterator = 'LE',                                                      -- delete it from the oldest to the newest
        stop_full_scan = function( task )
            if task.args.max_expired_tuples >= task.expired_tuples_count then -- stop full_scan if delete a lot 
                task.expired_tuples_count = 0
                return true
            end
            return false
        end,                                
        args = {
            max_expired_tuples = 1000
        }
    }
)

flexible versions

Mons generator

expirationd.start("clean_all", box.space.to_expire.id,
    function() return true end, -- is_tuple_expired always return true
    {
        index = 'exp',
        trx   =  true,          -- one transaction per batch
        -- to do this we should rewrite tree indexing on pairs
        iterate = function( task )
            return task.space:pairs({ clock.time() - (365*24*60*60) }, { iterator = 'GT' })
                :take_while(function(task)
                    -- return false if you want to stop full scan 
                    if is_too_many_expired(task)
                        return false
                    end
                    return true
                end)
        end,
        args = {
            max_expired_tuples = 1000
        }
    }
)

Maybe we should take the union of implementations from above, the interface will be simpler without take_while:

expirationd.start("clean_all", box.space.to_expire.id,
    function() return true end, -- is_tuple_expired always return true
    {
        index = 'exp',
        trx   =  true, -- one transaction per batch
        -- to do this we should rewrite tree indexing on pairs
        iterate = function( task )
            return task.space:pairs({ clock.time() - (365*24*60*60) }, { iterator = 'GT' })
        end,
        stop_full_scan = function( task )
            if task.args.max_expired_tuples >= task.expired_tuples_count then
                task.expired_tuples_count = 0
                return true
            end
            return false
        end,
        args = {
            max_expired_tuples = 1000
        }
    }
)
-- Template from Sasha
{
    gen_first_batch_iterator_params = function(space)
        local key =  fiber.time() - 86400
        local opts = {iterator = 'GT', limit = 1024}
        return key, opts
    end,
    gen_next_batch_iterator_params = <...>,
}

Updated tuples deleted by expirationd

Problem

expirationd removes not expired tuples.

Details

The expirationd is set like shown below
local subs_space = box.schema.space.create('sp', {engine = "memtx"})
subs_space:create_index('primary', { type = "tree", parts = { 1, "string", 2, "string" } })
expirationd.start("sp_remove_deleted", box.space.sp.id, expire_func, { tuples_per_iteration=100, force=true, full_scan_time=2*60*60 })

Then the expd_fiber meets api(main)_fiber we lose updated tuples.
Looks like expd reads 100 tuples and suspends. Meanwhile Api updates tuples in DB.
Then expd calls expire_func with outdated tuples (wich was stored in cash before suspend). This leads to errors.

suspend(scan_space, task)

It seem like we can call suspend before select. But will it totally fix the problem?
It may help if fiber won't be interrupted.
tatantool "Fibers manager" switch fibers on any service call(e.g. select) or only by fiber.sleep()?

Task stop(restart) may freeze up to task.full_scan_delay seconds

We've encountered expd freezing on

fiber.sleep(task.full_scan_delay)

This happens due to worker_fiber being sleepy (so cancel fails), and fiber calling task.start got stuck.
This happened on production, led to malfunctioning cluster, pretty nasty.
Quickfix would be to add fiber.testcancel() after pcall:

local state, err = pcall(task.do_worker_iteration, task)

I can provide more details regarding reproduceer/test-case, if necessary.

Json-path index as primary key doesn't work

tarantool> construct_key(tree.id, tree:select()[1])
---
- - {'age': 3}
...

tarantool> tree:get(construct_key(tree.id, tree:select()[1]))
---
- error: 'Supplied key type of part 0 does not match index part type: expected scalar'
...

after: #52

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.