'Running Kafka in distributed mode, data not sinking into Opensearch via Opensearch Kafka connector

I have installed Kafka, opensearch and opensearch-dashboards using the brew install command. Till now everything is working fine. Now I am working on Opensearch Kafka connector. This will allow sinking Kafka topics to opensearch. I have configured everything to make it workable, as I have created kafka-sink.properties and mentioned connector.class name there.

Then I just go to edit connect-distributed.properties file and uncommented plugin.path line and saved.

after that I just put open-search-connector.jar into /usr/local/share/java/ location and just started kafka connect distributed by running "./connect-distributed.sh /usr/local/etc/kafka/connect-distributed.properties /usr/local/etc/kafka/kafka-connect-opensearch-sink.properties" command. I just below output

    [2022-03-28 17:38:53,918] INFO WorkerInfo values: 
        jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../logs, -Dlog4j.configuration=file:./../config/connect-log4j.properties
        jvm.spec = Amazon.com Inc., OpenJDK 64-Bit Server VM, 1.8.0_282, 25.282-b08
        jvm.classpath = /Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/activation-1.1.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/argparse4j-0.7.0.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/audience-annotations-0.5.0.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/commons-cli-1.4.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/commons-lang3-3.8.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-api-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-basic-auth-extension-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-file-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-json-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-mirror-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-mirror-client-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-runtime-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/connect-transforms-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/hk2-api-2.6.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/hk2-locator-2.6.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/hk2-utils-2.6.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-annotations-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-core-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-databind-2.10.5.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jackson-module-scala_2.12-2.10.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jakarta.inject-2.6.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/javassist-3.25.0-GA.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/javassist-3.26.0-GA.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jaxb-api-2.3.0.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jersey-client-2.31.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jersey-common-2.31.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-2.31.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-core-2.31.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jersey-hk2-2.31.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jersey-media-jaxb-2.31.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jersey-server-2.31.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-client-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-continuation-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-http-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-io-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-security-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-server-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-servlet-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-servlets-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-util-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jetty-util-ajax-9.4.38.v20210224.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/jopt-simple-5.0.4.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka-clients-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka-log4j-appender-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka-streams-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka-streams-examples-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka-streams-scala_2.12-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka-streams-test-utils-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka-tools-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2-sources.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/log4j-1.2.17.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/lz4-java-1.7.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/maven-artifact-3.6.3.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/metrics-core-2.2.0.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-codec-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-common-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-handler-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-transport-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/paranamer-2.8.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/plexus-utils-3.2.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/reflections-0.9.12.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/rocksdbjni-5.18.4.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/scala-collection-compat_2.12-2.1.6.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/scala-java8-compat_2.12-0.9.1.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/scala-library-2.12.11.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/scala-logging_2.12-3.9.2.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/scala-reflect-2.12.11.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/slf4j-api-1.7.30.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/snappy-java-1.1.7.3.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/zookeeper-3.5.9.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/zookeeper-jute-3.5.9.jar:/Users/manoj/Downloads/kafka_2.12-2.6.2/bin/../libs/zstd-jni-1.4.4-7.jar
        os.spec = Mac OS X, x86_64, 10.16
        os.vcpus = 16
     (org.apache.kafka.connect.runtime.WorkerInfo:71)
    [2022-03-28 17:38:53,940] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectDistributed:92)
    [2022-03-28 17:38:53,960] INFO Loading plugin from: /usr/local/share/kafka/plugins/kafka-connect-opensearch.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
    [2022-03-28 17:38:55,749] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/share/kafka/plugins/kafka-connect-opensearch.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-03-28 17:38:55,750] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-03-28 17:38:55,750] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-03-28 17:38:55,750] INFO Added plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-03-28 17:38:55,750] INFO Added plugin 'org.apache.kafka.common.config.provider.FileConfigProvider' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-03-28 17:38:55,750] INFO Added plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-03-28 17:38:55,751] INFO Added plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-03-28 17:38:55,751] INFO Added plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-03-28 17:38:55,754] INFO Loading plugin from: 
    [2022-03-28 17:38:56,354] INFO Added aliases 'PrincipalConnectorClientConfigOverridePolicy' and 'Principal' to plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-03-28 17:38:56,418] INFO DistributedConfig values: 
        access.control.allow.methods = 
        access.control.allow.origin = 
        admin.listeners = null
        bootstrap.servers = [localhost:9092]
        client.dns.lookup = use_all_dns_ips
        client.id = 
        config.providers = []
        config.storage.replication.factor = 1
        config.storage.topic = connect-configs
        connect.protocol = sessioned
        connections.max.idle.ms = 540000
        connector.client.config.override.policy = None
        group.id = connect-cluster
        header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
        heartbeat.interval.ms = 3000
        inter.worker.key.generation.algorithm = HmacSHA256
        inter.worker.key.size = null
        inter.worker.key.ttl.ms = 3600000
        inter.worker.signature.algorithm = HmacSHA256
        inter.worker.verification.algorithms = [HmacSHA256]
        internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
        internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
        key.converter = class org.apache.kafka.connect.json.JsonConverter
        listeners = [HTTP://:8083]
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        offset.flush.interval.ms = 10000
        offset.flush.timeout.ms = 5000
        offset.storage.partitions = 25
        offset.storage.replication.factor = 1
        offset.storage.topic = connect-offsets
        plugin.path = [/usr/local/share/java, /usr/local/share/kafka/plugins, /opt/connectors, ]
        rebalance.timeout.ms = 60000
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        response.http.headers.config = 
        rest.advertised.host.name = 
        rest.advertised.listener = http
        rest.advertised.port = 8083
        rest.extension.classes = []
        rest.host.name = null
        rest.port = 8083
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        scheduled.rebalance.max.delay.ms = 300000
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        status.storage.partitions = 5
        status.storage.replication.factor = 1
        status.storage.topic = connect-status
        task.shutdown.graceful.timeout.ms = 5000
        topic.creation.enable = true
        topic.tracking.allow.reset = true
        topic.tracking.enable = true
        value.converter = class org.apache.kafka.connect.json.JsonConverter
        worker.sync.timeout.ms = 3000
        worker.unsync.backoff.ms = 300000
     (org.apache.kafka.connect.runtime.distributed.DistributedConfig:354)
    [2022-03-28 17:38:56,420] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49)

    [2022-03-28 17:38:57,414] INFO [Worker clientId=connect-1, groupId=connect-cluster] Current config state offset -1 is behind group assignment 13, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1196)
    [2022-03-28 17:38:57,416] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished reading to end of log and updated config snapshot, new config log offset: 13 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1203)
    [2022-03-28 17:38:57,416] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset 13 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1257)
    [2022-03-28 17:38:57,416] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1285)
    [2022-03-28 17:38:57,448] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1619)
    Mar 28, 2022 5:38:57 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored. 
    Mar 28, 2022 5:38:57 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored. 
    Mar 28, 2022 5:38:57 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
    Mar 28, 2022 5:38:57 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored. 
    Mar 28, 2022 5:38:57 PM org.glassfish.jersey.internal.Errors logErrors
    WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
    WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
    WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
    WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
    WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
    
    [2022-03-28 17:38:57,572] INFO Started o.e.j.s.ServletContextHandler@7cb8437d{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:916)
    [2022-03-28 17:38:57,573] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:319)
    [2022-03-28 17:38:57,573] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)

just after that, I go to broswer to check is connector register and list, using localhost:8083/connectors, it [] empty.

Please suggest where I did mistake to make it workable.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source