This is not really an issue but a question how to use nifty-modbus in a multithreaded environment.
In my case I use nifty-modbus in a Reactor + Kotlin environment. I want to read multiple Modbus registers multihreaded.
@Singleton
class NiftyModbusAdapter : ModbusAdapter {
@Value("\${modbus.server.ip}")
var server: String = ""
@Value("\${modbus.server.port}")
var serverPort: Int = 0
lateinit var client: TcpNettyModbusClient
private var LOG = LoggerFactory.getLogger(NiftyModbusAdapter::class.java)
@PostConstruct
override fun connect(): Boolean {
val config = NettyTcpModbusClientConfig(server, serverPort)
client = TcpNettyModbusClient(config, NioEventLoopGroup(4), null)
client.start().get()
return true
}
@PreDestroy
override fun disconnect(): Boolean {
client.stop()
return true
}
override fun warmUpInverter(registerCache: RegisterCache, deviceNumber: Int): Mono<RegisterCache> =
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 40000,
count = 110
)
.map { registerCache }
override fun minimalWarmUpInverter(registerCache: RegisterCache, deviceNumber: Int): Mono<RegisterCache> =
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 40083,
count = 2
).map { registerCache }
override fun warmUpMeter1(registerCache: RegisterCache, deviceNumber: Int): Mono<RegisterCache> =
Mono.`when`(
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 40121,
count = 100
),
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 40221,
count = 72
)
).then(Mono.just(registerCache))
override fun minimalWarmUpMeter1(registerCache: RegisterCache, deviceNumber: Int): Mono<RegisterCache> =
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 40206,
count = 10
).map { registerCache }
override fun warmUpBattery1(registerCache: RegisterCache, deviceNumber: Int): Mono<RegisterCache> =
Mono.`when`(
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 57600,
count = 110
),
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 57710,
count = 36
)
)
.then(Mono.just(registerCache))
override fun minimalWarmUpBattery1(registerCache: RegisterCache, deviceNumber: Int): Mono<RegisterCache> =
fillRegisterCache(
registerCache = registerCache,
deviceNumber = deviceNumber,
registerAddress = 57716,
count = 22
).map { registerCache }
fun fillRegisterCache(
registerCache: RegisterCache,
deviceNumber: Int,
registerAddress: Int,
count: Int,
): Mono<Boolean> {
val request =
RegistersModbusMessage.readRegistersRequest(ModbusBlockType.Holding, deviceNumber, registerAddress, count)
return Mono.fromFuture {
LOG.debug("sending modbus request to device $deviceNumber and address $registerAddress and length $count")
client.sendAsync(request) as CompletableFuture<ModbusMessage>
}.subscribeOn(Schedulers.boundedElastic())
.map {
LOG.debug("received modbus answer to device $deviceNumber and address $registerAddress and length $count")
readByteArrayAsRegisterList(it.unwrap(RegistersModbusMessage::class.java))
}
.flatMap { registerCache.fillCacheValues(registerStartAddress = registerAddress, data = it) }
.doOnError { LOG.error("unable to read modbus", it) }
.map { true }
.onErrorResume { Mono.just(false) }
}
/**
* The registerModbusMessage's body contains a list of bytes. Two bytes are the content of a Register.
*/
private fun readByteArrayAsRegisterList(registersModbusMessage: RegistersModbusMessage?): List<Register> {
val registerList = mutableListOf<Register>()
if (registersModbusMessage == null) {
return registerList
}
val bytes = registersModbusMessage.dataCopy()
LOG.debug("read from modbus {}", bytes)
for (i in bytes.size downTo 0 step 2) {
if (i > 1) {
registerList.add(Register(byteArrayOf(bytes[(i - 1) - 1], bytes[i - 1])))
}
}
registerList.reverse()
return registerList
}
}
The connection is done while Bean-Creation, this is only once.
In my application I can see the warmup-functions are started at the same time but the results are computed sequentially. The configured EventLoopGroup in the TcpNettyModbusClient just changed where the results were computed sequentially.