Giter Club home page Giter Club logo

Comments (5)

yogilad avatar yogilad commented on August 30, 2024

Hi @The-Funk ,

Can you share more details?
What's the error you get?
Steps to repro?

from azure-kusto-java.

The-Funk avatar The-Funk commented on August 30, 2024

Hello, I can indeed.

Here is what I use for all of my ingestion operations. This uses the SmallRye Mutiny API. This code works just fine with 3.2.1 but fails when used in conjunction with 4.0+

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.microsoft.azure.kusto.data.HttpClientProperties;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.storage.StorageException;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.runtime.annotations.RegisterForReflection;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.unchecked.Unchecked;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;

@RegisterForReflection
@ApplicationScoped
public class BuiltIngest {

    private static QueuedIngestClient ingestClient;
    private static String dbName;
    private static final Logger logger = Logger.getLogger(BuiltIngest.class);
    private static final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule())
            .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

    void onStart(@Observes StartupEvent startupEvent){
        try {
            HttpClientProperties properties = HttpClientProperties.builder()
                    .keepAlive(true)
                    .maxKeepAliveTime(120)
                    .maxIdleTime(60)
                    .maxConnectionsPerRoute(50)
                    .maxConnectionsTotal(50)
                    .build();

            ConnectionStringBuilder icsb =
                    ConnectionStringBuilder.createWithAadApplicationCredentials(
                            ConfigProvider.getConfig().getValue("adx.ingest", String.class),
                            ConfigProvider.getConfig().getValue("adx.appid", String.class),
                            ConfigProvider.getConfig().getValue("adx.appkey", String.class),
                            ConfigProvider.getConfig().getValue("adx.apptenant", String.class));

            // Create the ADX Ingest Client on startup
            ingestClient = IngestClientFactory.createClient(icsb, properties);
            // Set the DB name on startup
            dbName = ConfigProvider.getConfig().getValue("adx.database", String.class);
        }
        catch (Exception e){
            logger.fatal("ADXService Fatal Error 02 - Msg -> " + e.getMessage(), e);
        }
    }

    // -------- Connection Details Above ---------

    public <T> void ingestListToADX(List<T> list, String mappingName, String tableName) {
        try {
            IngestionMapping mapping = new IngestionMapping();
            mapping.setIngestionMappingReference(mappingName, IngestionMapping.IngestionMappingKind.JSON);

            IngestionProperties ingestionProperties = new IngestionProperties(dbName, tableName);
            ingestionProperties.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
            ingestionProperties.setIngestionMapping(mapping);

            StreamSourceInfo streamSourceInfo = new StreamSourceInfo(
                    new ByteArrayInputStream(StandardCharsets.UTF_8.encode(mapper.writeValueAsString(list)).array()));

            ingestDataToADX(streamSourceInfo, ingestionProperties).subscribe().with(
                    item -> logger.info(getIngestionStatus(item)),
                    fail -> logger.error("Unable to ingest list data into ADX.", fail));
        }
        catch (JsonProcessingException pe){
            logger.error("JSON processing failure. Unable to ingest list data into ADX.", pe);
        }
    }

    public <T> void ingestBeanToADX(T bean, String mappingName, String tableName){
        try {
            IngestionMapping mapping = new IngestionMapping();
            mapping.setIngestionMappingReference(mappingName, IngestionMapping.IngestionMappingKind.JSON);

            IngestionProperties ingestionProperties = new IngestionProperties(dbName, tableName);
            ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
            ingestionProperties.setIngestionMapping(mapping);

            StreamSourceInfo streamSourceInfo = new StreamSourceInfo(
                    new ByteArrayInputStream(StandardCharsets.UTF_8.encode(mapper.writeValueAsString(bean)).array()));

            ingestDataToADX(streamSourceInfo, ingestionProperties).subscribe().with(
                    item -> logger.info(getIngestionStatus(item)),
                    fail -> logger.error("Unable to ingest bean data into ADX.", fail));
        }
        catch (JsonProcessingException pe){
            logger.error("JSON processing failure. Unable to ingest bean data into ADX.", pe);
        }
    }

    // Other Methods ---------------------------------------------------------------------------------------------------

    /** Asynchronous ingestion to ADX. */
    private Uni<IngestionResult> ingestDataToADX(StreamSourceInfo data, IngestionProperties ingestProps){
        return Uni.createFrom().item(
                Unchecked.supplier(() -> {
                    try {
                        return ingestClient.ingestFromStream(data, ingestProps);
                    }
                    catch (IngestionClientException | IngestionServiceException e) {
                        throw new ADXLibraryException("Failed to ingest data into ADX.",
                                Response.Status.INTERNAL_SERVER_ERROR);
                    }
                })
        ).emitOn(Infrastructure.getDefaultWorkerPool());
    }

    /** Gets ADX ingestion operation status */
    private String getIngestionStatus(IngestionResult result){
        try {
            return result.getIngestionStatusCollection().get(0).status.toString();
        }
        catch (StorageException | URISyntaxException e) {
            throw new ADXLibraryException("Failed to get ingestion status results.",
                    Response.Status.INTERNAL_SERVER_ERROR);
        }
    }

from azure-kusto-java.

alonadam avatar alonadam commented on August 30, 2024

Hi @The-Funk,
Could you please check if problem persists when you don't provide custom http properties?
i.e. IngestClientFactory.createClient(icsb);

from azure-kusto-java.

The-Funk avatar The-Funk commented on August 30, 2024

Hi @alonadam removing the HTTPClientProperties (and removing the StorageException, since 4.0.1 no longer has that dependency) resolves the issue. Something in version 4.0.0+ must not like my HTTP Client settings.

from azure-kusto-java.

alonadam avatar alonadam commented on August 30, 2024

OK thanks for confirming. We are working on a fix.

from azure-kusto-java.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.