bvt123 / sch Goto Github PK
View Code? Open in Web Editor NEWLicense: GNU General Public License v3.0
License: GNU General Public License v3.0
LagLive code:
LEFT JOIN
(
SELECT host_name,
shard_num
FROM system.clusters
WHERE cluster = 'sharded'
) AS hosts ON shard = hosts.shard_num
make cluster name configurable, without code change or add the requirements to the documentation
When using dictGet() in Transform we must be sure that dictionary data is actual. But the dictionary reloaded not too often and we could get data loss.
We could add a dictionary to the dependencies list and use the dictionary last_successful_update_time in LagLive and schBlock() to delay the Transform run.
The current MVP implementation of the scheduler is written on bash and has problems with stability and dependencies (clickhouse-client needed), especially when starting in a containerized environment.
bash scripts are here - https://github.com/bvt123/SCH/tree/main/bash
Let's rewrite them in Golang to a small compact code and binary.
main loop
while true ; do
clickhouse-client -q "select * from SCH.LagLive" -f TSVRaw | \
while IFS=$'\t' read -r topic host sql ts version; do
process $topic "$sql" $version $host
done
sleep 1
done
It gets jobs by SQL query and executes the received SQL code asynchronously with a semaphore set by some tag (topic) to prevent simultaneous execution of SQL code for a particular "topic". Clickhouse host (cluster node) to connect to is received together with the query.
echo $query | clickhouse-client -h $4 -n -f TSV --param_topic=${1}_p --log_comment=$HID:$2"
the result of SQL code is String or error message.
Normal output should be sent to the text log
in case of errors, they should be processed and stored in text log, Offsets & Logs tables:
printf '%(%Y-%m-%d %H:%M:%S)T\tWARN\t'"$1-$2"'\t'"$err"'\n' >> $LOG
printf "insert into ETL.ErrLog(topic,err) values(\'$1\',\'$err\')" | $CLC 2>> $LOG
printf "insert into SCH.Offsets select topic,last,rows,next,processor,\'$err\',hostid from SCH.Offsets where topic=\'$1\'" |
$CLC 2>> $LOG
For clustered setups where the event stream (like from Kafka) goes to different replicas, some rows from the source table could be lost if replication lag became greater than the delay configured per destination table.
We already have a workaround for that situation in some processing Templates:
set receive_timeout=300;
SYSTEM SYNC REPLICA {src} ;
SELECT throwLog(count() > 0,'WARNING','Replication is Active')
FROM clusterAllReplicas('replicated',system.replication_queue)
WHERE database=extract('{src}', '(.*?)\.') AND table=extract('{src}', '\.(.*)');
But it looks ugly. While waiting for table replication to be not active is possible in some cases, checking replication_queue on every cluster node looks very strange.
It could be done better.
When Prepare for Transform code (Template) reads data from the source table it can check Clickhouse insert blocks sequence in all parts that are applied to the WHERE filter. Part names contain block numbers and if some part is still not fetched from the remote replica block sequence will be not monotonic and the error or warning should be fired enabling the retry of the Transform attempt.
create or replace function checkBlockSequence as (arr) ->
-- input: [array of array[start,end]]
-- output: bool
arraySum(
(arrayFilter(x,y->y%2=0,
(arrayPopFront
(arrayDifference
(arrayFlatten(arraySort(arr))
)
) as s),
arrayEnumerate(s) ) as t
)
) != length(t)
;
insert into SCH.Offsets (topic, next, last, rows, processor,state,hostid)
with getTableDependencies(getSetting('agi_topic'),{delay}) as _deps,
( select last from SCH.Offsets where topic=getSetting('agi_topic') ) as _last,
data as ( select max(pos) as p,id, splitByChar('_',_part) as block
from {src}
where (pos > _last.1 or pos = _last.1 and id > _last.2 )
and pos < _deps.1
group by id
order by p,id
limit {maxstep}
),
(select count(), max((p,id)),
checkBlockSequence(groupUniqArray([toUInt32(block[2]),toUInt32(block[3])]))
from data) as stat
select topic,
stat.2,
last,
stat.1 as rows,
if(rows >= {maxstep},'FullUniqId','UniqId') as processor,
if(rows > 0, 'processing', _deps.2 ) as state,
splitByChar(':',getSetting('log_comment'))[1] as hostid
from SCH.Offsets
where topic=getSetting('agi_topic')
and next.1 = toDateTime64(0,3)
and throwLog(stat.3, 'ERROR','Block Sequence Mismatch. It could be too high replication lag.')=0
The source table should not be partitioned as it complicates checking block sequences.
For loading data from an external DB (like MySQL) we can use insert ... select technique to pull data from the remote server. The source table should have an updated_at DatimeTime column and index on it.
with (select max(updated_at) from dest) as max
select * from mysql(db1,table=XXX) where created >= max and created < now() - delay
"last" position will be updated not by the processor (it doesn't know it), but by MV connected to the dest table
step here should be the number of seconds, not rows.
source table could be any string that will be substituted by the template processor:
The topic is always ready to run and only repeat timeout stops it from executing continuously.
Several types of Jobs:
Clickhouse is a better store for Stage data than Kafka
https://www.youtube.com/watch?v=ExU7fJFw4Bg
The scheduler uses a tuple(now64(),rowNumberInAllBlocks) to build a unique ID for coming events.
It's UInt64+UInt32. It could be done better by using SnowFlakeId (Int64). 22 bits is enough for operating 4M row blocks. for 1M blocks, we have space for 4 different cluster nodes.
create function toSnowflake64 as (dt,ch) ->
bitOr(dateTime64ToSnowflake(dt),
bitAnd(bitAnd(ch,0x3FFFFF)+
bitAnd(bitShiftRight(ch, 20),0x3FFFFF)+
bitAnd(bitShiftRight(ch, 40),0x3FFFFF),
0x3FFFFF)
);
with rowNumberInAllBlocks() as rn,
now64() as dt
SELECT hex(toSnowflake64(dt,rn) as sn),
snowflakeToDateTime64(sn)
from numbers(10);
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.