准备集群,集群server节点需要使用四个端口:
10800 (JDBC/ODBC), 11211 (TCP connector), 47100 (listener), 47500 (discovery)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="test-grid-xx1-yy1"/>
<property name="userAttributes">
<map>
<entry key="rack" value="xx1"/>
</map>
</property>
<!-- Enable peer class loading. -->
<property name="peerClassLoadingEnabled" value="true"/>
<!-- Set deployment mode. -->
<property name="deploymentMode" value="ISOLATED"/>
<property name="authenticationEnabled" value="true"/>
<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>
<property name="sqlConfiguration">
<bean class="org.apache.ignite.configuration.SqlConfiguration">
<property name="sqlSchemas">
<list>
<value>DEV</value>
<value>SIT</value>
<value>UAT</value>
<value>VER</value>
</list>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<!-- Partitioned cache for Persons data. -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="test"/>
<property name="backups" value="1"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<!-- Group the cache belongs to. -->
<property name="groupName" value="DEV"/>
</bean>
</list>
</property>
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<!--
Default memory region that grows endlessly. A cache is bound to this memory region
unless it sets another one in its CacheConfiguration.
-->
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="name" value="Default_Region"/>
<property name="persistenceEnabled" value="true"/>
<!-- 100 MB memory region with disabled eviction -->
<property name="initialSize" value="#{100L * 1024 * 1024}"/>
<property name="maxSize" value="#{100L * 1024 * 1024 * 1024}"/>
<!-- Enabling SEGMENTED_LRU page replacement for this region. -->
<property name="pageReplacementMode" value="SEGMENTED_LRU"/>
<property name="metricsEnabled" value="true"/>
<property name="warmUpConfiguration">
<bean class="org.apache.ignite.configuration.LoadAllWarmUpConfiguration"/>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP discovery SPI to provide list of
initial nodes from the first cluster.
-->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="localAddress" value="10.0.0.1"/>
<property name="localPort" value="47500"/>
<property name="localPortRange" value="20"/>
<property name="ackTimeout" value="#{3L * 1000}"/>
<property name="reconnectDelay" value="2000"/>
<property name="reconnectCount" value="5"/>
<property name="connectionRecoveryTimeout" value="#{60L * 1000}"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="shared" value="true"/>
<property name="addresses">
<list>
<value>10.0.0.2:47500..47520</value>
<value>10.0.0.3:47500..47520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP communication SPI changing local
port number for the nodes from the first cluster.
-->
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="47100"/>
<property name="localPortRange" value="20"/>
<property name="connectTimeout" value="#{5L * 1000}"/>
<property name="reconnectCount" value="12"/>
<property name="idleConnectionTimeout" value="#{60L * 1000}"/>
<property name="usePairedConnections" value="true"/>
<property name="connectionsPerNode" value="3"/>
</bean>
</property>
</bean>
</beans>
登录到ignite,创建用户
./sqlline.sh --verbose=true -u 'jdbc:ignite:thin://10.0.0.1:10800,10.0.0.2:10800/SYS'
#默认管理账户是ignite:ignite,一般需要创建自定义用户
CREATE USER "wzp" WITH PASSWORD 'wzp';
#查看常用的运维信息:https://ignite.apache.org/docs/latest/monitoring-metrics/system-views
select * from SYS.NODES;
select * from SYS.CACHES;
select * from SYS.SCHEMAS;
select * from SYS.TABLES;
select * from SYS.TABLE_COLUMNS;
select * from SYS.BINARY_METADATA;
使用create table来创建cache,可以避免部署自定义类到server节点并支持使用二级索引;
table对应的cahe的名称为SQL_<SCHEMA_NAME>_<TABLE_NAME>,即:SQL_DEV_PERSON
CREATE TABLE IF NOT EXISTS DEV.PERSON (
id int,
city_id int,
name varchar,
age int,
company varchar,
PRIMARY KEY (id)
) WITH "TEMPLATE=PARTITIONED,CACHE_GROUP=DEV,BACKUPS=1,ATOMICITY=TRANSACTIONAL,VALUE_TYPE=com.wzp.ignite.bo.Person";
@Bean
public IgniteClient igniteClient() {
ClientConfiguration cfg = new ClientConfiguration();
cfg.setAddresses("10.0.0.1:10800", "10.0.0.2:10800").setUserName("wzp").setUserPassword("wzp");
cfg.setPartitionAwarenessEnabled(false);
cfg.setTransactionConfiguration(new ClientTransactionConfiguration().setDefaultTxTimeout(10000)
.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC)
.setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ));
return Ignition.startClient(cfg);
}
@Bean("igniteTransactionManager")
public IgniteClientSpringTransactionManager igniteTransactionManager(IgniteClient cli) {
IgniteClientSpringTransactionManager mgr = new IgniteClientSpringTransactionManager();
mgr.setClientInstance(cli);
return mgr;
}
ignite实质是key-value数据库,会自动使用BinaryObject格式把value(POJO对象)进行序列化后存储
package com.wzp.ignite.bo;
@lombok.Data
public class Person implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private int id;
private String name;
private int age;
private String company;
// 即使配置@QuerySqlField也无法支持自动序列化时的驼峰转下划线:cityId->city_id
private int city_id;
}
只要保证table定义里的VALUE_TYPE和POJO对象的类名一致,那么sql也能查询到使用Key-Value API存入的POJO数据。如果使用复合主键,那么需要设置好table定义里的KEY_TYPE。
@Autowired
IgniteClient ignite;
private void insertObject(int key) {
ClientCache<Integer, Person> cache = ignite.cache("SQL_" + schemaName + "_" + "PERSON");
Person person = new Person();
person.setId(key);
person.setCity_id(1001);
person.setAge(1);
person.setName("Serializable");
person.setCompany("MIDEA");
cache.put(key, person);
}
private Person getObject(int key) {
ClientCache<Integer, Person> cache = ignite.cache("SQL_" + schemaName + "_" + "PERSON");
return cache.get(key);
}
如需原子性地修改几行数据,在public方法上加上注解:
@Transactional(transactionManager = “igniteTransactionManager”)
如果程序里不存在Person类的定义,也可以使用BinaryObjectBuilder操作Table表数据;
builder的名称需要与table定义里的VALUE_TYPE一致:
private void insertBinaryObject(int key) {
ClientCache<Integer, BinaryObject> binaryCache = ignite.cache("SQL_" + schemaName + "_" + "PERSON").withKeepBinary();
BinaryObjectBuilder builder = ignite.binary().builder("com.wzp.ignite.bo.Person");
builder.setField("id", key);
builder.setField("city_id", 1001);
builder.setField("name", "BinaryObjectBuilder");
builder.setField("age", 30);
builder.setField("company", "MIDEA");
binaryCache.put(key, builder.build());
}
private BinaryObject getBinaryObject(int key) {
ClientCache<Integer, BinaryObject> binaryCache = ignite.cache(cacheName).withKeepBinary();
return binaryCache.get(key);
}
/**
* 用key-value api操作使用复合主键的表
*/
private void insertBinaryObject(int userId, String month) {
ClientCache<BinaryObject, BinaryObject> binaryCache = ignite.cache("SQL_" + schemaName + "_" + "SALLARY")
.withKeepBinary();
// 定义表时,指定KEY_TYPE
BinaryObjectBuilder keyBuilder = ignite.binary().builder("com.wzp.ignite.bo.SallaryKey");
keyBuilder.setField("user_id", userId);
keyBuilder.setField("month", month);
BinaryObjectBuilder valueBuilder = ignite.binary().builder("com.wzp.ignite.bo.SallaryValue");
valueBuilder.setField("salary_before_tax", 100000L);
valueBuilder.setField("tax", 3L);
binaryCache.put(keyBuilder.build(), valueBuilder.build());
}
瘦客户端模式下必须使用SQL API才能在服务端进行数据筛选,并利用到二级索引;
通过setSchema来切换schema
private void insertBySql(int key) {
ClientCache<Integer, Person> cache = ignite.cache(cacheName);
cache.query(new SqlFieldsQuery("INSERT INTO person(id, city_id, name, age, company) VALUES(?, ?, ?, ?, ?)")
.setArgs(key, 1001, "SQL", 30, "Midea").setSchema(schemaName)).getAll();
}
private Person selectBySql(int key) {
ClientCache<Integer, Person> cache = ignite.cache(cacheName);
try (FieldsQueryCursor<List<?>> cursor = cache
.query(new SqlFieldsQuery("select * from person where id = ?").setArgs(key)
.setSchema(schemaName))) {
for (List<?> row : cursor) {
Map<String,Object> map=new HashMap<>();
for (int i = 0; i < row.size(); i++) {
// 列名作为key
map.put(cursor.getFieldName(i), row.get(i));
}
// 使用jackson进行反序列化
ObjectMapper mapper = new ObjectMapper();
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
return mapper.convertValue(map, Person.class);
}
}
return null;
}