'BinaryInvalidTypeException in Ignite Remote Filter
The following code is based on a combination of Ingite's CacheQueryExample and CacheContinuousQueryExample.
The code starts a fat Ignite client. Three organizations are created in the cache and we are listening to the updates to the cache. The remote filter is set to trigger the continuous query if the organization name is "Google". Peer class loading is enabled by the default examples xml config file (example-ignite.xml), so the expectation is that the remote node is aware of the Organization class.
However the following exceptions are shown in the Ignite server's console (one for each cache entry) and all three records are returned to the client in the continuous query's event handler instead of just the "Google" record. If the filter is changed to check on the key instead of the value, the correct behavior is observed and a single record is returned to the local listener.
[08:28:43,302][SEVERE][sys-stripe-1-#2][query] CacheEntryEventFilter failed: class o.a.i.binary.BinaryInvalidTypeException: o.a.i.examples.model.Organization
[08:28:51,819][SEVERE][sys-stripe-2-#3][query] CacheEntryEventFilter failed: class o.a.i.binary.BinaryInvalidTypeException: o.a.i.examples.model.Organization
[08:28:52,692][SEVERE][sys-stripe-3-#4][query] CacheEntryEventFilter failed: class o.a.i.binary.BinaryInvalidTypeException: o.a.i.examples.model.Organization
To run the code
- Start an ignite server using examples/config/example-ignite.xml as the configuration file.
- Replace the content of ignite's CacheContinuousQueryExample.java with the following code. You may have to change the path to the configuration file to an absolute path.
package org.apache.ignite.examples.datagrid;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.model.Organization;
import org.apache.ignite.examples.model.Person;
import org.apache.ignite.lang.IgniteBiPredicate;
import java.util.Collection;
/**
* This examples demonstrates continuous query API.
* <p>
* Remote nodes should always be started with special configuration file which
* enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
* <p>
* Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
* start node with {@code examples/config/example-ignite.xml} configuration.
*/
public class CacheContinuousQueryExample {
/** Organizations cache name. */
private static final String ORG_CACHE = CacheQueryExample.class.getSimpleName() + "Organizations";
/**
* Executes example.
*
* @param args Command line arguments, none required.
* @throws Exception If example execution failed.
*/
public static void main(String[] args) throws Exception {
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
System.out.println();
System.out.println(">>> Cache continuous query example started.");
CacheConfiguration<Long, Organization> orgCacheCfg = new CacheConfiguration<>(ORG_CACHE);
orgCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
orgCacheCfg.setIndexedTypes(Long.class, Organization.class);
// Auto-close cache at the end of the example.
try {
ignite.getOrCreateCache(orgCacheCfg);
// Create new continuous query.
ContinuousQuery<Long, Organization> qry = new ContinuousQuery<>();
// Callback that is called locally when update notifications are received.
qry.setLocalListener(new CacheEntryUpdatedListener<Long, Organization>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Long, ? extends Organization>> evts) {
for (CacheEntryEvent<? extends Long, ? extends Organization> e : evts)
System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
}
});
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Long, Organization>>() {
@Override public CacheEntryEventFilter<Long, Organization> create() {
return new CacheEntryEventFilter<Long, Organization>() {
@Override public boolean evaluate(CacheEntryEvent<? extends Long, ? extends Organization> e) {
//return e.getKey() == 3;
return e.getValue().name().equals("Google");
}
};
}
});
ignite.getOrCreateCache(ORG_CACHE).query(qry);
// Populate caches.
initialize();
Thread.sleep(2000);
}
finally {
// Distributed cache could be removed from cluster only by #destroyCache() call.
ignite.destroyCache(ORG_CACHE);
}
}
}
/**
* Populate cache with test data.
*/
private static void initialize() {
IgniteCache<Long, Organization> orgCache = Ignition.ignite().cache(ORG_CACHE);
// Clear cache before running the example.
orgCache.clear();
// Organizations.
Organization org1 = new Organization("ApacheIgnite");
Organization org2 = new Organization("Apple");
Organization org3 = new Organization("Google");
orgCache.put(org1.id(), org1);
orgCache.put(org2.id(), org2);
orgCache.put(org3.id(), org3);
}
}
Solution 1:[1]
Here is an interim workaround that involves using and deserializing binary objects. Hopefully, someone can post a proper solution.
Here is the main() function modified to work with BinaryObjects instead of the Organization object:
public static void main(String[] args) throws Exception {
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
System.out.println();
System.out.println(">>> Cache continuous query example started.");
CacheConfiguration<Long, Organization> orgCacheCfg = new CacheConfiguration<>(ORG_CACHE);
orgCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
orgCacheCfg.setIndexedTypes(Long.class, Organization.class);
// Auto-close cache at the end of the example.
try {
ignite.getOrCreateCache(orgCacheCfg);
// Create new continuous query.
ContinuousQuery<Long, BinaryObject> qry = new ContinuousQuery<>();
// Callback that is called locally when update notifications are received.
qry.setLocalListener(new CacheEntryUpdatedListener<Long, BinaryObject>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Long, ? extends BinaryObject>> evts) {
for (CacheEntryEvent<? extends Long, ? extends BinaryObject> e : evts) {
Organization org = e.getValue().deserialize();
System.out.println("Updated entry [key=" + e.getKey() + ", val=" + org + ']');
}
}
});
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Long, BinaryObject>>() {
@Override public CacheEntryEventFilter<Long, BinaryObject> create() {
return new CacheEntryEventFilter<Long, BinaryObject>() {
@Override public boolean evaluate(CacheEntryEvent<? extends Long, ? extends BinaryObject> e) {
//return e.getKey() == 3;
//return e.getValue().name().equals("Google");
return e.getValue().field("name").equals("Google");
}
};
}
});
ignite.getOrCreateCache(ORG_CACHE).withKeepBinary().query(qry);
// Populate caches.
initialize();
Thread.sleep(2000);
}
finally {
// Distributed cache could be removed from cluster only by #destroyCache() call.
ignite.destroyCache(ORG_CACHE);
}
}
}
Solution 2:[2]
Peer class loading is enabled ... so the expectation is that the remote node is aware of the Organization class.
This is the problem. You can't peer class load "model" objects, i.e., objects used to create the table.
Two solutions:
- Deploy the model class(es) to the server ahead of time. The rest of the code -- the filters -- can be peer class loaded
- As @rgb1380 demonstrates, you can use
BinaryObjects, which is the underlying data format
Another small point, to use "autoclose" you need to structure your code like this:
// Auto-close cache at the end of the example.
try (var cache = ignite.getOrCreateCache(orgCacheCfg)) {
// do stuff
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | rgb1380 |
| Solution 2 | Stephen Darlington |
