Giter Club home page Giter Club logo

cyber-search's Introduction

We are sovereign self-funded community of scientists, developers, engineers and craftsmen.

Currently focused on creating a planet-scale superintelligence: Cyber

The plan is simple:

Teach Bostrom bootloader to develop type I civilization in greenfield on the Moon. While we stabilize and comprehend him we will launch Cyber superintelligence for the Earth from Bostrom bootloader. Then we aim to move physical Bostrom infrastructure to the Moon becoming the first sovereign blockchain in space.

The problem is attacked by self fulfilling prophecy in massively collaborative open source game:

Age of Superintelligence

in 6 epsiodes

7 years of r&d, but still we are young and released Episdode 1: A new hope

Everyone can get cheap moon citizenship for a while in portal.

70% of genesis gifted to ~5M addresses in Ethereum and Cosmos ecosystems. So the chances you can claim your BOOT in portal are high.

We are cyberian civilization of 10k+ citizens and growing thanks to our products:

cyber

Your superintelligence. Focus on soft3 applications. Vision. Repos:

Your immortal robot for the great web.

Cyb is a pure web3 and soft3 browser.

use it at cyb.ai

Help us build a better future

May the code be with you

cyber-search's People

Contributors

abcdea avatar abitrolly avatar arturalbov avatar asadovka avatar cyberadmin avatar cyborgshead avatar hleb-albau avatar irus avatar kevinlilu avatar mastercyb avatar olga584 avatar savetheales avatar yodamike avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cyber-search's Issues

Refactoring of chainPump

  • ethereum chain pump
  • ethereum chain dump
  • check Kafka unavailable case -> should retry after time out
  • check Parity unavailable case -> exception handler, retry with after timeout
  • check Cassandra unavailable case -> retry after time

Apply adress delta using horisontal scaling and CAS(compare and set)

Kafka support horizontal scaling by splitting single topic for N partitions. For each partition individual consumer can be assigned.
Cassandra provides CAS support for writes. Write succeed only if Cassandra contains address with given values (previous address state). IF not, than new address state returned. Than delta should be applied on new state for new CAS write.
Next should be done:

  • Split each chain address delta topic into 50 partitions(see ConvertEntityToAddressDeltaProcess). Writes goes to random partition.
  • Define address delta consumer group of 10 consumers(see ApplyingAddressDeltasProcess)
  • Add cas writes for address(see ApplyingAddressDeltasProcess)

Connector's message format to kafka

Proposal to message' format:

  • type: string ("block", message type)
  • blockchain: string (bitcoin, ethereum)
  • branch: string (ex. main || testnet <- bitcoin, main || testnet:ropsten || testnet:kovan <- ethereum, etc)
  • block_hash: string (ex. bitcoin main 000000000000000000fe316680c703c4464c8963253ad3feb3be1cfd129e107b)
  • block_number: integer (ex. bitcoin main 480899)
  • body: json which contains raw block data and raw txs

Ethereum indexins is too slow

Right now we have problems with indexing speed. Using logs, we see, that problem exists somewhere in download next blocks area.

12:18:33.067 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398801 block
12:18:33.086 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398802 block
12:18:33.103 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398803 block
12:18:33.129 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398804 block
12:18:33.143 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398805 block
12:18:33.155 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398806 block
12:19:43.019 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398807 block
12:19:43.042 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398808 block
12:19:43.053 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398809 block
12:19:43.060 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398810 block
12:19:43.074 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM 2398811 block 
  1. we should know exactly, that problem rises from parity json rpc interface. So, to be sure, add logs before and after http queries.
  2. If our assumption is true next possible actions:
  • move parity data on sdd,
  • start parity without archive mode ( right now >1TB required)
  • investigate IPC solution for pump.

Add two-phase kafka-cassandra commit for non-idempotents queries

Address service has main purpose of atomic updating addresses values in cassandra. To archive this, we split some entity (for example bitcoin transaction) into N address updates, transactionally(see kafka exactly-once) save this entities into kafka. Than read topics with updates, and apply them on cassandra one-by-one.
There is case, when we already update address values in cassandra, but application cant successfully save consumer offset and exits . Also, keep in mind possibility of parallel address updates.

search-api container doesn't respond

I don't see that search API is responding and there is an exception in stdout:

$ docker logs search-api 
ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
16:52:21.154 [main] INFO f.cyber.cassandra.CassandraService - Initializing cassandra service
16:52:21.740 [main] INFO f.cyber.cassandra.CassandraService - Initializing cassandra service finished
Exception in thread "main" java.lang.IllegalArgumentException: Table or materialized view address does not exist in keyspace "ethereum_classic"
	at com.datastax.driver.mapping.AnnotationParser.parseEntity(AnnotationParser.java:76)
	at com.datastax.driver.mapping.MappingManager.getMapper(MappingManager.java:273)
	at com.datastax.driver.mapping.MappingManager.mapper(MappingManager.java:232)
	at fund.cyber.cassandra.repository.EthereumKeyspaceRepository.<init>(EthereumKeyspaceRepository.kt:30)
	at fund.cyber.cassandra.CassandraService$ethereumClassicRepository$2.invoke(CassandraService.kt:49)
	at fund.cyber.cassandra.CassandraService$ethereumClassicRepository$2.invoke(CassandraService.kt:21)
	at kotlin.SynchronizedLazyImpl.getValue(Lazy.kt:130)
	at fund.cyber.cassandra.CassandraService.getEthereumClassicRepository(CassandraService.kt)
	at fund.cyber.search.SearchApiApplication.main(SearchApiApplication.kt:19)

Port is open, but no response is back:

$ curl http://127.0.0.1:32700/ping
curl: (56) Recv failure: Connection reset by peer

@hleb-albau, maybe I am using anything wrong?

Bitcoin pump doesn't download blocks after 200k blocks

12:49:29.301 [main] INFO fund.cyber.pump.ChainPump - BITCOIN pump start block number is 227602
12:49:33.915 [main] INFO fund.cyber.pump.ChainPump - BITCOIN pump is started
12:49:33.921 [RxCachedThreadScheduler-2] DEBUG f.c.pump.ConcurrentPulledBlockchain - Looking for 227602-227606 blocks for BITCOIN
12:50:43.987 [RxCachedThreadScheduler-6] DEBUG f.c.p.b.JsonRpcBlockToBitcoinBundleConverter - Transactions - Total ids: 224, Cache hits: 35
12:51:32.624 [RxCachedThreadScheduler-7] DEBUG f.c.p.b.JsonRpcBlockToBitcoinBundleConverter - Transactions - Total ids: 271, Cache hits: 14
12:51:32.825 [RxCachedThreadScheduler-4] DEBUG f.c.p.b.JsonRpcBlockToBitcoinBundleConverter - Transactions - Total ids: 2192, Cache hits: 4
12:51:47.587 [RxCachedThreadScheduler-5] DEBUG f.c.p.b.JsonRpcBlockToBitcoinBundleConverter - Transactions - Total ids: 489, Cache hits: 126
12:51:55.409 [RxCachedThreadScheduler-3] DEBUG f.c.p.b.JsonRpcBlockToBitcoinBundleConverter - Transactions - Total ids: 824, Cache hits: 75 

After that log, bitcoin pump stucks. Start points:

  • bitcoind client crashed with OOM errors ( 10 gb is not enough). How much it needs?
  • we create too large tx batch request ( total request is about 4k txes). General question -> find option query size for loading block tx.

Pump stuck after exception

11:02:26.937 [main] ERROR fund.cyber.pump.common.ChainPump - Error during processing stream
org.apache.http.NoHttpResponseException: parity_eth:8545 failed to respond
	at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:141) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) ~[httpcore-4.4.9.jar!/:4.4.9]
	at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) ~[httpcore-4.4.9.jar!/:4.4.9]
	at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) ~[httpcore-4.4.9.jar!/:4.4.9]
	at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) ~[httpcore-4.4.9.jar!/:4.4.9]
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:72) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:221) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:165) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:140) ~[httpclient-4.5.5.jar!/:4.5.5]
	at org.web3j.protocol.http.HttpService.send(HttpService.java:67) ~[core-2.3.0.jar!/:na]
	at org.web3j.protocol.core.Request.send(Request.java:69) ~[core-2.3.0.jar!/:na]
	at fund.cyber.pump.ethereum.client.EthereumBlockchainInterface.lastNetworkBlock(EthereumBlockchainInterface.kt:38) ~[classes!/:na]
	at fund.cyber.pump.common.node.ConcurrentPulledBlockchain.lastNetworkBlock(FlowableBlockchainInterface.kt) ~[common-pumps!/:na]
	at fund.cyber.pump.common.node.ConcurrentPulledBlockchain$generateAvailableBlocksNumbersRangesFunction$1.apply(FlowableBlockchainInterface.kt:34) ~[common-pumps!/:na]
	at fund.cyber.pump.common.node.ConcurrentPulledBlockchain$generateAvailableBlocksNumbersRangesFunction$1.apply(FlowableBlockchainInterface.kt:20) ~[common-pumps!/:na]
	at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.request(FlowableGenerate.java:109) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:545) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onComplete(FlowableFlatMap.java:673) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.checkTerminated(FlowableFlattenIterable.java:399) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:256) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onComplete(FlowableFlattenIterable.java:194) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onComplete(BasicFuseableSubscriber.java:120) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:120) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableToList$ToListSubscriber.onComplete(FlowableToList.java:86) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:426) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onComplete(FlowableFlatMap.java:673) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onComplete(BasicFuseableSubscriber.java:120) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onComplete(FlowableSubscribeOn.java:108) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:57) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.requestUpstream(FlowableSubscribeOn.java:133) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onSubscribe(FlowableSubscribeOn.java:90) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:13234) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:13180) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) ~[rxjava-2.1.9.jar!/:na]
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) ~[rxjava-2.1.9.jar!/:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]

API: Expose index stats

We need to see through cyber-ui when indexator is not functional. For example, when it can not access underlying Parity server.

Possible implementation is that the error should propagate to http://search.cyber.fund/cybernode status page and change the cyber-search status to yellow with exact message "ETH node (Parity) is not accessible for indexing".

Setup base connector implementation for ethereum' parity

  • History sync via parallel chunks of blocks processing
  • Continuous sync block by block with timeout retry
  • Request chunk of blocks via one http connection
  • Stream to kafka
  • Debug mode without kafka
  • Transform data to message (issue #1)
  • Configuration via args for kafka port, parity port, block start/end processing
  • Monitor and logging of connector service

Finalize pump implementation.

Before make core architecture commitment, we should finish next subtasks:

  • Insure, that if error occurs during chain indexation, than some N times repeating mechanism goes on.
  • Insure, that if error repeated N times, application should exit. (not just be zombied).
  • Delete old kafka-connectors to chains (no lost code).
  • Delete old processing modules (no lost code).
  • Move cassandra depended stuff from modules common, dao-service, pump into single cassandra-common module.
  • Implement interface of state storing
  • Genesis transactions

Error sending message to kafka

10:59:06.906 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM_CLASSIC 2811165 block
10:59:06.984 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.kafka.KafkaStorage - Initializing kafka storage producer
10:59:07.510 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.kafka.KafkaStorage - Initializing kafka storage producer completed
10:59:08.765 [RxCachedThreadScheduler-1] ERROR fund.cyber.pump.ChainPump - Error during processing ETHEREUM_CLASSIC stream
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1049170 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
	at fund.cyber.pump.kafka.KafkaStorageAction.store(KafkaStorageActionTemplate.kt:25) ~[pumps.jar:na]
	at fund.cyber.pump.ChainPump$initializeStreamProcessing$3.accept(ChainPump.kt:81) ~[pumps.jar:na]
	at fund.cyber.pump.ChainPump$initializeStreamProcessing$3.accept(ChainPump.kt:11) ~[pumps.jar:na]
	at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65) ~[rxjava-2.1.6.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableSkipWhile$SkipWhileSubscriber.onNext(FlowableSkipWhile.java:71) [rxjava-2.1.6.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableScan$ScanSubscriber.onNext(FlowableScan.java:83) [rxjava-2.1.6.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:400) [rxjava-2.1.6.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) [rxjava-2.1.6.jar:na]
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) [rxjava-2.1.6.jar:na]
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) [rxjava-2.1.6.jar:na]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_131]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1049170 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476) ~[guava-19.0.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:435) ~[guava-19.0.jar:na]
	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:79) ~[guava-19.0.jar:na]
	at fund.cyber.node.common.ConcurrencyKt.awaitAll(Concurrency.kt:17) ~[common.jar:na]
	at fund.cyber.pump.kafka.KafkaStorageAction.store(KafkaStorageActionTemplate.kt:21) ~[pumps.jar:na]
	... 15 common frames omitted
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1049170 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 

Right now we store 3 entities to kafka. What entity is serialized to 1mb? May be we should add transparient field for some field-collection?

Open source features for community

  • Update .gitignore
  • Add .editorconfig
  • Update LICENSE
  • Setup project description
  • Update README (+ setup, FAQ, wiki, dockerhub)
  • Describe Semantic Versioning of cyber-markets
  • Setup CHANGELOG
  • CI bages
  • Add current version bage (and other bages)
  • Describe Code Style
  • Add CONTRIBUTING
  • Add CONTRIBUTORS
  • Add ISSUE_TEMPLATE
  • Add PULL_REQUEST_TEMPLATE
  • Organize labels
  • Describe labels
  • Describe GitFlow (branches and issues)
  • Describe cyber•Search board on organization level (milestones flow)
  • Add CSIP (cyber-search improvement proposal)
  • Describe CSIP (cyber-search improvement proposal)

Add a common fault-tolerant mechanism of interaction with the elassandra

We have often used object mapper for elassandra without error handling, that was good at first, but when the service is experiencing a high load there are a lot of timeout exceptions, so we need to add a common mechanism for handling such errors. After timeout exception is cathed we need to give to elassandra some time to restore and remain our queries after a certain period.

Save streams processing state.

To insure exactly once delivery, we should store somewhere processing state.
Issues comes from different sources.

  1. previous connectors/processors may duplicate data.
  2. kafka consumer itself

Story - Reliable API

From @Pechalka. As a developer I want our API to be reliable.

  • API finds blocks
  • API finds transactions
  • API finds contracts

For demo we query 10 random blocks from Etherscan and compare that our API also returns them.

Store search results

If we want to tune our search engine, we should store and analyze search results. For every search, we should store next fields:

rawRequest,
timestamp,
documentsIndex,
maxScore,
totalHits,
searchTime,
Map<blockchain, hits>

Address-service stucks error

 11:59:10.329 [pool-1-thread-2] ERROR f.c.a.c.ConvertEntityToAddressDeltaProcess - Calculating ETHEREUM addresses deltas for block 2431515 finished with error
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for ETHEREUM_ADDRESS_DELTA-0: 30002 ms has passed since last attempt plus backoff time
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476) ~[guava-19.0.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:455) ~[guava-19.0.jar:na]
	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:79) ~[guava-19.0.jar:na]
	at fund.cyber.node.common.ConcurrencyKt.awaitAll(Concurrency.kt:17) ~[common.jar:na]
	at fund.cyber.address.common.ConvertEntityToAddressDeltaProcess.processRecord(ConvertEntityToAddressDeltaProcess.kt:86) ~[address-service.jar:na]
	at fund.cyber.node.kafka.KafkaConsumerRunner.readAndProcessRecords(KafkaConsumerRunner.kt:43) [common.jar:na]
	at fund.cyber.node.kafka.KafkaConsumerRunner.run(KafkaConsumerRunner.kt:24) [common.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for ETHEREUM_ADDRESS_DELTA-0: 30002 ms has passed since last attempt plus backoff time
11:59:10.373 [pool-1-thread-3] ERROR f.c.a.c.ConvertEntityToAddressDeltaProcess - Calculating ETHEREUM addresses deltas for block 2431515 finished with error
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for ETHEREUM_ADDRESS_DELTA-0: 30045 ms has passed since last attempt plus backoff time
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476) ~[guava-19.0.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:455) ~[guava-19.0.jar:na]
	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:79) ~[guava-19.0.jar:na]
	at fund.cyber.node.common.ConcurrencyKt.awaitAll(Concurrency.kt:17) ~[common.jar:na]
	at fund.cyber.address.common.ConvertEntityToAddressDeltaProcess.processRecord(ConvertEntityToAddressDeltaProcess.kt:86) ~[address-service.jar:na]
	at fund.cyber.node.kafka.KafkaConsumerRunner.readAndProcessRecords(KafkaConsumerRunner.kt:43) [common.jar:na]
	at fund.cyber.node.kafka.KafkaConsumerRunner.run(KafkaConsumerRunner.kt:24) [common.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for ETHEREUM_ADDRESS_DELTA-0: 30045 ms has passed since last attempt plus backoff time
11:59:10.381 [pool-1-thread-2] ERROR f.c.node.kafka.KafkaConsumerRunner - consumer record processing error
org.apache.kafka.common.KafkaException: The client hasn't received acknowledgment for some previously sent messages and can no longer retry them. It isn't safe to continue.
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212) ~[kafka-clients-1.0.0.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.0.jar:na]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
11:59:10.425 [pool-1-thread-3] ERROR f.c.node.kafka.KafkaConsumerRunner - consumer record processing error
org.apache.kafka.common.KafkaException: The client hasn't received acknowledgment for some previously sent messages and can no longer retry them. It isn't safe to continue. 

address service should not stuck, but ether fail or retry.
also, why does this error occur?

API documentation

We using openapi v3 and swagger to generate our api docs. Right now docs is incomplete, next should be done:

  • Add ethereum endpoint
  • Add bitcoin endpoint
  • Enable CI support
  • Deploy on gcloud with glound test endpoint

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.