Comments (5)
Hi @The-Funk ,
Can you share more details?
What's the error you get?
Steps to repro?
from azure-kusto-java.
Hello, I can indeed.
Here is what I use for all of my ingestion operations. This uses the SmallRye Mutiny API. This code works just fine with 3.2.1 but fails when used in conjunction with 4.0+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.microsoft.azure.kusto.data.HttpClientProperties;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.storage.StorageException;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.runtime.annotations.RegisterForReflection;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.unchecked.Unchecked;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
@RegisterForReflection
@ApplicationScoped
public class BuiltIngest {
private static QueuedIngestClient ingestClient;
private static String dbName;
private static final Logger logger = Logger.getLogger(BuiltIngest.class);
private static final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
void onStart(@Observes StartupEvent startupEvent){
try {
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxIdleTime(60)
.maxConnectionsPerRoute(50)
.maxConnectionsTotal(50)
.build();
ConnectionStringBuilder icsb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
ConfigProvider.getConfig().getValue("adx.ingest", String.class),
ConfigProvider.getConfig().getValue("adx.appid", String.class),
ConfigProvider.getConfig().getValue("adx.appkey", String.class),
ConfigProvider.getConfig().getValue("adx.apptenant", String.class));
// Create the ADX Ingest Client on startup
ingestClient = IngestClientFactory.createClient(icsb, properties);
// Set the DB name on startup
dbName = ConfigProvider.getConfig().getValue("adx.database", String.class);
}
catch (Exception e){
logger.fatal("ADXService Fatal Error 02 - Msg -> " + e.getMessage(), e);
}
}
// -------- Connection Details Above ---------
public <T> void ingestListToADX(List<T> list, String mappingName, String tableName) {
try {
IngestionMapping mapping = new IngestionMapping();
mapping.setIngestionMappingReference(mappingName, IngestionMapping.IngestionMappingKind.JSON);
IngestionProperties ingestionProperties = new IngestionProperties(dbName, tableName);
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
ingestionProperties.setIngestionMapping(mapping);
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(
new ByteArrayInputStream(StandardCharsets.UTF_8.encode(mapper.writeValueAsString(list)).array()));
ingestDataToADX(streamSourceInfo, ingestionProperties).subscribe().with(
item -> logger.info(getIngestionStatus(item)),
fail -> logger.error("Unable to ingest list data into ADX.", fail));
}
catch (JsonProcessingException pe){
logger.error("JSON processing failure. Unable to ingest list data into ADX.", pe);
}
}
public <T> void ingestBeanToADX(T bean, String mappingName, String tableName){
try {
IngestionMapping mapping = new IngestionMapping();
mapping.setIngestionMappingReference(mappingName, IngestionMapping.IngestionMappingKind.JSON);
IngestionProperties ingestionProperties = new IngestionProperties(dbName, tableName);
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
ingestionProperties.setIngestionMapping(mapping);
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(
new ByteArrayInputStream(StandardCharsets.UTF_8.encode(mapper.writeValueAsString(bean)).array()));
ingestDataToADX(streamSourceInfo, ingestionProperties).subscribe().with(
item -> logger.info(getIngestionStatus(item)),
fail -> logger.error("Unable to ingest bean data into ADX.", fail));
}
catch (JsonProcessingException pe){
logger.error("JSON processing failure. Unable to ingest bean data into ADX.", pe);
}
}
// Other Methods ---------------------------------------------------------------------------------------------------
/** Asynchronous ingestion to ADX. */
private Uni<IngestionResult> ingestDataToADX(StreamSourceInfo data, IngestionProperties ingestProps){
return Uni.createFrom().item(
Unchecked.supplier(() -> {
try {
return ingestClient.ingestFromStream(data, ingestProps);
}
catch (IngestionClientException | IngestionServiceException e) {
throw new ADXLibraryException("Failed to ingest data into ADX.",
Response.Status.INTERNAL_SERVER_ERROR);
}
})
).emitOn(Infrastructure.getDefaultWorkerPool());
}
/** Gets ADX ingestion operation status */
private String getIngestionStatus(IngestionResult result){
try {
return result.getIngestionStatusCollection().get(0).status.toString();
}
catch (StorageException | URISyntaxException e) {
throw new ADXLibraryException("Failed to get ingestion status results.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}
from azure-kusto-java.
Hi @The-Funk,
Could you please check if problem persists when you don't provide custom http properties?
i.e. IngestClientFactory.createClient(icsb);
from azure-kusto-java.
Hi @alonadam removing the HTTPClientProperties (and removing the StorageException, since 4.0.1 no longer has that dependency) resolves the issue. Something in version 4.0.0+ must not like my HTTP Client settings.
from azure-kusto-java.
OK thanks for confirming. We are working on a fix.
from azure-kusto-java.
Related Issues (20)
- Format classes for Parameterized queries does not exist in release binaries HOT 1
- The current version of the jackson-databind dependency has a security vulnerability HOT 2
- KustoResultSetTable getTimestamp should respect ISO format
- CloudInfo.retrieveCloudInfoForCluster should respect user specified httpClient HOT 2
- Asynchronous HTTP Client HOT 24
- Not able to access ADX cluster deployed in a VNet HOT 2
- Imcompatible libraries HOT 1
- All floating point numbers are now being returned as BigDecimals HOT 1
- This repo is missing important files
- Feature Request: Make AbstractSourceInfo "public" HOT 1
- Requests fail with Http Errorcode 400 if 'x-ms-user' contains umlauts HOT 1
- azure-core dependency conflicts HOT 2
- KustoResultSetTable getLong(String columnName) gives casting exception
- ClientRequestProperties transfer parameters by JDK 11 issue HOT 7
- SSL Exception From HTTP Apache Client Library HOT 9
- Setting timeout via the code does not stop the query HOT 3
- Introduce KustoRequest Object HOT 1
- [BUG] Using Kusto QueuedIngestClient throwing ConcurrentModificationException HOT 2
- Java - add advanced proxy settings in the SDK level (for AT&T) HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from azure-kusto-java.