本文整理汇总了Python中gc.collect方法的典型用法代码示例。如果您正苦于以下问题:Python gc.collect方法的具体用法?Python gc.collect怎么用?Python gc.collect使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块gc的用法示例。
在下文中一共展示了gc.collect方法的29个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: loadW2V
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def loadW2V(self,emb_path, type="bin"):
print("Loading W2V data...")
num_keys = 0
if type=="textgz":
# this seems faster than gensim non-binary load
for line in gzip.open(emb_path):
l = line.strip().split()
st=l[0].lower()
self.pre_emb[st]=np.asarray(l[1:])
num_keys=len(self.pre_emb)
if type=="text":
# this seems faster than gensim non-binary load
for line in open(emb_path):
l = line.strip().split()
st=l[0].lower()
self.pre_emb[st]=np.asarray(l[1:])
num_keys=len(self.pre_emb)
else:
self.pre_emb = Word2Vec.load_word2vec_format(emb_path,binary=True)
self.pre_emb.init_sims(replace=True)
num_keys=len(self.pre_emb.vocab)
print("loaded word2vec len ", num_keys)
gc.collect()
开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:25,
示例2: test_close_auto_generated_session
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_close_auto_generated_session(self, mocker):
# Setup
import requests
import gc
session_mock = mocker.Mock(spec=requests.Session)
session_mock.request.return_value = "response"
session_cls_mock = mocker.patch("requests.Session")
session_cls_mock.return_value = session_mock
# Run
client = requests_.RequestsClient()
client.send(("method", "url", {}))
del client
gc.collect()
assert session_mock.close.call_count == 1
开发者ID:prkumar,项目名称:uplink,代码行数:19,
示例3: test_close_auto_created_session
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_close_auto_created_session(self, mocker):
# Setup
import asyncio
import gc
import aiohttp
mock_session = mocker.Mock(spec=aiohttp.ClientSession)
session_cls_mock = mocker.patch("aiohttp.ClientSession")
session_cls_mock.return_value = mock_session
positionals = [1]
keywords = {"keyword": 2}
# Run: Create client
client = aiohttp_.AiohttpClient.create(*positionals, **keywords)
# Run: Get session
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(client.session()))
# Verify: session created with args
session_cls_mock.assert_called_with(*positionals, **keywords)
del client
gc.collect()
session_cls_mock.return_value.close.assert_called_with()
开发者ID:prkumar,项目名称:uplink,代码行数:27,
示例4: _place_electrodes
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def _place_electrodes(self, fix_th=True):
""" Add the defined electrodes to a mesh
Parameters:
------------
fn_out: str
name of output file
"""
w_elec = copy.deepcopy(self.mesh)
w_elec.fix_tr_node_ordering()
electrode_surfaces = [None for i in range(len(self.electrode))]
for i, el in enumerate(self.electrode):
logger.info('Placing Electrode:\n{0}'.format(str(el)))
w_elec, n = el.add_electrode_to_mesh(w_elec)
electrode_surfaces[i] = n
w_elec.fix_th_node_ordering()
w_elec.fix_tr_node_ordering()
if fix_th:
logger.info('Improving mesh quality')
w_elec.fix_thin_tetrahedra()
gc.collect()
return w_elec, electrode_surfaces
开发者ID:simnibs,项目名称:simnibs,代码行数:26,
示例5: fix_tr_node_ordering
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def fix_tr_node_ordering(self):
''' Fixes the node ordering of the triangles in-place '''
corresponding = self.find_corresponding_tetrahedra()
triangles = np.where(self.elm.elm_type == 2)[0]
triangles = triangles[corresponding != -1]
corresponding = corresponding[corresponding != -1]
normals = self.triangle_normals().value[triangles]
baricenters = self.elements_baricenters().value
pos_bar = baricenters[corresponding - 1] - baricenters[triangles]
dotp = np.einsum('ij, ij -> i', normals, pos_bar)
switch = triangles[dotp > 0]
tmp = np.copy(self.elm.node_number_list[switch, 1])
self.elm.node_number_list[switch, 1] = self.elm.node_number_list[switch, 0]
self.elm.node_number_list[switch, 0] = tmp
del tmp
gc.collect()
开发者ID:simnibs,项目名称:simnibs,代码行数:22,
示例6: unload_group
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def unload_group(chat_id):
global gcache, gc_counter
try:
with open("markov/chat_" + str(chat_id) + ".dat", "wb") as f:
pickle.dump(groups[chat_id], f)
groups[chat_id] = None
del groups[chat_id]
gcache.remove(chat_id)
gc_counter -= 1
if gc_counter < 1:
gc_counter = gc_every_unload
gc.collect()
except KeyboardInterrupt as e:
raise e
except:
pass
开发者ID:39bit,项目名称:Markov_Bot,代码行数:18,
示例7: _wrapper
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def _wrapper(method, check_steps_end):
if hasattr(method, '_already_decorated') and method._already_decorated:
return method
@wraps(method)
def wrapped(*args, **kwargs):
MolerTest._steps_start()
caught_exception = None
try:
result = method(*args, **kwargs)
except Exception as exc:
caught_exception = exc
finally:
MolerTest._check_exceptions_occured(caught_exception)
if check_steps_end:
MolerTest._check_steps_end()
gc.collect()
return result
wrapped._already_decorated = True
return wrapped
开发者ID:nokia,项目名称:moler,代码行数:23,
示例8: test_subscription_doesnt_block_subscriber_to_be_garbage_collected
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_subscription_doesnt_block_subscriber_to_be_garbage_collected():
from moler.publisher import Publisher
notifier = Publisher()
garbage_collected_subscribers = []
class GcSubscriber(object):
def __del__(self):
garbage_collected_subscribers.append('Subscriber')
subscr = GcSubscriber()
notifier.subscribe(subscr)
del subscr
gc.collect()
assert 'Subscriber' in garbage_collected_subscribers
开发者ID:nokia,项目名称:moler,代码行数:19,
示例9: test_garbage_collected_subscriber_is_not_notified
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_garbage_collected_subscriber_is_not_notified():
from moler.publisher import Publisher
notifier = Publisher()
received_data = []
class GcSubscriber(object):
def __call__(self, data):
received_data.append(data)
subscr1 = GcSubscriber()
subscr2 = GcSubscriber()
notifier.subscribe(subscriber=subscr1)
notifier.subscribe(subscriber=subscr2)
del subscr1
gc.collect()
notifier.notify_subscribers("data")
assert len(received_data) == 1
开发者ID:nokia,项目名称:moler,代码行数:22,
示例10: test_subscription_doesnt_block_subscriber_to_be_garbage_collected
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_subscription_doesnt_block_subscriber_to_be_garbage_collected():
from moler.threaded_moler_connection import ThreadedMolerConnection
moler_conn = ThreadedMolerConnection()
garbage_collected_subscribers = []
class Subscriber(object):
def __del__(self):
garbage_collected_subscribers.append('Subscriber')
class CloseSubscriber(object):
pass
subscr = Subscriber()
close_subscr = CloseSubscriber()
moler_conn.subscribe(subscr, close_subscr)
del subscr
gc.collect()
assert 'Subscriber' in garbage_collected_subscribers
开发者ID:nokia,项目名称:moler,代码行数:23,
示例11: test_garbage_collected_subscriber_is_not_notified
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_garbage_collected_subscriber_is_not_notified():
from moler.threaded_moler_connection import ThreadedMolerConnection
moler_conn = ThreadedMolerConnection()
received_data = []
class Subscriber(object):
def __call__(self, data, time_recv):
received_data.append(data)
subscr1 = Subscriber()
subscr2 = Subscriber()
moler_conn.subscribe(observer=subscr1, connection_closed_handler=do_nothing_func)
moler_conn.subscribe(observer=subscr2, connection_closed_handler=do_nothing_func)
del subscr1
gc.collect()
moler_conn.data_received("data", datetime.datetime.now())
MolerTest.sleep(1, True) # Processing in separate thread so have to wait.
assert len(received_data) == 1
# --------------------------- resources ---------------------------
开发者ID:nokia,项目名称:moler,代码行数:25,
示例12: test_structured_object_indexing
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_structured_object_indexing(self, shape, index, items_changed,
dt, pat, count, singleton):
"""Structured object reference counting for advanced indexing."""
zero = 0
one = 1
arr = np.zeros(shape, dt)
gc.collect()
before_zero = sys.getrefcount(zero)
before_one = sys.getrefcount(one)
# Test item getting:
part = arr[index]
after_zero = sys.getrefcount(zero)
assert after_zero - before_zero == count * items_changed
del part
# Test item setting:
arr[index] = one
gc.collect()
after_zero = sys.getrefcount(zero)
after_one = sys.getrefcount(one)
assert before_zero - after_zero == count * items_changed
assert after_one - before_one == count * items_changed
开发者ID:Frank-qlu,项目名称:recruit,代码行数:25,
示例13: test_structured_object_take_and_repeat
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_structured_object_take_and_repeat(self, dt, pat, count, singleton):
"""Structured object reference counting for specialized functions.
The older functions such as take and repeat use different code paths
then item setting (when writing this).
"""
indices = [0, 1]
arr = np.array([pat] * 3, dt)
gc.collect()
before = sys.getrefcount(singleton)
res = arr.take(indices)
after = sys.getrefcount(singleton)
assert after - before == count * 2
new = res.repeat(10)
gc.collect()
after_repeat = sys.getrefcount(singleton)
assert after_repeat - after == count * 2 * 10
开发者ID:Frank-qlu,项目名称:recruit,代码行数:19,
示例14: test_leak_in_structured_dtype_comparison
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_leak_in_structured_dtype_comparison(self):
# gh-6250
recordtype = np.dtype([('a', np.float64),
('b', np.int32),
('d', (str, 5))])
# Simple case
a = np.zeros(2, dtype=recordtype)
for i in range(100):
a == a
assert_(sys.getrefcount(a) < 10)
# The case in the bug report.
before = sys.getrefcount(a)
u, v = a[0], a[1]
u == v
del u, v
gc.collect()
after = sys.getrefcount(a)
assert_equal(before, after)
开发者ID:Frank-qlu,项目名称:recruit,代码行数:22,
示例15: test_env_dirty
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_env_dirty(self):
self.odoo.config['auto_commit'] = False
def test_record_garbarge_collected():
user_ids = self.odoo.env['res.users'].search([('id', '!=', 1)])
user = self.user_obj.browse(user_ids[0])
self.assertNotIn(user, self.odoo.env.dirty)
self.assertNotIn(user, user.env.dirty)
user.name = "Joe"
self.assertIn(user, self.odoo.env.dirty)
self.assertIn(user, user.env.dirty)
test_record_garbarge_collected()
# Ensure the record has been garbage collected for the next test
import gc
gc.collect()
self.assertEqual(list(self.odoo.env.dirty), [])
开发者ID:OCA,项目名称:odoorpc,代码行数:20,
示例16: clear
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def clear(self):
self.user = None
self._users = weakref.WeakValueDictionary()
self._emojis = {}
self._calls = {}
self._guilds = {}
self._voice_clients = {}
# LRU of max size 128
self._private_channels = OrderedDict()
# extra dict to look up private channels by user id
self._private_channels_by_user = {}
self._messages = self.max_messages and deque(maxlen=self.max_messages)
# In cases of large deallocations the GC should be called explicitly
# To free the memory more immediately, especially true when it comes
# to reconnect loops which cause mass allocations and deallocations.
gc.collect()
开发者ID:Rapptz,项目名称:discord.py,代码行数:20,
示例17: test_leaks
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_leaks(self):
def _get_items():
items, stops = [], []
for creator in _CREATORS:
item, stop = creator()
items.append(item)
stops.append(stop)
[stop() for stop in stops]
return weakref.WeakSet(items)
items = _get_items()
_pause()
# If this next line were uncommented, it would work without all the
# weakrefs in the code.
# gc.collect()
self.assertEqual(list(items), [])
开发者ID:ManiacalLabs,项目名称:BiblioPixel,代码行数:18,
示例18: test_unclosed_client_session_issue_645_in_async_mode
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_unclosed_client_session_issue_645_in_async_mode(self):
def exception_handler(_, context):
nonlocal session_unclosed
if context["message"] == "Unclosed client session":
session_unclosed = True
async def issue_645():
client = WebClient(base_url="http://localhost:8888", timeout=1, run_async=True)
try:
await client.users_list(token="xoxb-timeout")
except asyncio.TimeoutError:
pass
session_unclosed = False
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
loop.run_until_complete(issue_645())
gc.collect() # force Python to gc unclosed client session
self.assertFalse(session_unclosed, "Unclosed client session")
开发者ID:slackapi,项目名称:python-slackclient,代码行数:21,
示例19: test_write_del
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_write_del():
for i in range(N):
pids0 = get_ffmpeg_pids()
w = imageio_ffmpeg.write_frames(test_file2, (64, 64))
pids1 = get_ffmpeg_pids().difference(pids0) # generator has not started
w.send(None)
w.send(b"x" * 64 * 64 * 3)
pids2 = get_ffmpeg_pids().difference(pids0) # now ffmpeg is running
del w
gc.collect()
pids3 = get_ffmpeg_pids().difference(pids0) # now its not
assert len(pids1) == 0
assert len(pids2) == 1
assert len(pids3) == 0
开发者ID:imageio,项目名称:imageio-ffmpeg,代码行数:18,
示例20: test_threading
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def test_threading():
# See issue #20
num_threads = 16
num_frames = 5
def make_iterator(q, n):
for i in range(n):
gen = imageio_ffmpeg.read_frames(test_file1)
gen.__next__() # meta data
q.put(gen.__next__()) # first frame
q = queue.Queue()
threads = []
for i in range(num_threads):
t = threading.Thread(target=make_iterator, args=(q, num_frames))
t.daemon = True
t.start()
threads.append(t)
for i in range(num_threads * num_frames):
print(i, end=" ")
q.get()
gc.collect() # this seems to help invoke the segfault earlier
开发者ID:imageio,项目名称:imageio-ffmpeg,代码行数:26,
示例21: __getitem__
点赞 6
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def __getitem__(self, item):
if isinstance(item, (tuple, list)):
time_args = item[0]
filters = item[1]
else:
time_args = item
filters = None
if isinstance(time_args, slice):
return self.collect(
time_args.start, time_args.stop, filters=filters,
)
elif isinstance(time_args, (datetime, str)):
filename = self.find_closest(time_args, filters=filters)
if filename is None:
return None
return self.read(filename)
开发者ID:atmtools,项目名称:typhon,代码行数:20,
示例22: deletePreEmb
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def deletePreEmb(self):
self.pre_emb=dict()
gc.collect()
开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:5,
示例23: getDataSets
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def getDataSets(self, training_paths, max_document_length, percent_dev, batch_size, is_char_based):
if is_char_based:
x1_text, x2_text, y=self.getTsvDataCharBased(training_paths)
else:
x1_text, x2_text, y=self.getTsvData(training_paths)
# Build vocabulary
print("Building vocabulary")
vocab_processor = MyVocabularyProcessor(max_document_length,min_frequency=0,is_char_based=is_char_based)
vocab_processor.fit_transform(np.concatenate((x2_text,x1_text),axis=0))
print("Length of loaded vocabulary ={}".format( len(vocab_processor.vocabulary_)))
i1=0
train_set=[]
dev_set=[]
sum_no_of_batches = 0
x1 = np.asarray(list(vocab_processor.transform(x1_text)))
x2 = np.asarray(list(vocab_processor.transform(x2_text)))
# Randomly shuffle data
np.random.seed(131)
shuffle_indices = np.random.permutation(np.arange(len(y)))
x1_shuffled = x1[shuffle_indices]
x2_shuffled = x2[shuffle_indices]
y_shuffled = y[shuffle_indices]
dev_idx = -1*len(y_shuffled)*percent_dev//100
del x1
del x2
# Split train/test set
self.dumpValidation(x1_text,x2_text,y,shuffle_indices,dev_idx,0)
# TODO: This is very crude, should use cross-validation
x1_train, x1_dev = x1_shuffled[:dev_idx], x1_shuffled[dev_idx:]
x2_train, x2_dev = x2_shuffled[:dev_idx], x2_shuffled[dev_idx:]
y_train, y_dev = y_shuffled[:dev_idx], y_shuffled[dev_idx:]
print("Train/Dev split for {}: {:d}/{:d}".format(training_paths, len(y_train), len(y_dev)))
sum_no_of_batches = sum_no_of_batches+(len(y_train)//batch_size)
train_set=(x1_train,x2_train,y_train)
dev_set=(x1_dev,x2_dev,y_dev)
gc.collect()
return train_set,dev_set,vocab_processor,sum_no_of_batches
开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:39,
示例24: getTestDataSet
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def getTestDataSet(self, data_path, vocab_path, max_document_length):
x1_temp,x2_temp,y = self.getTsvTestData(data_path)
# Build vocabulary
vocab_processor = MyVocabularyProcessor(max_document_length,min_frequency=0)
vocab_processor = vocab_processor.restore(vocab_path)
print len(vocab_processor.vocabulary_)
x1 = np.asarray(list(vocab_processor.transform(x1_temp)))
x2 = np.asarray(list(vocab_processor.transform(x2_temp)))
# Randomly shuffle data
del vocab_processor
gc.collect()
return x1,x2, y
开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:16,
示例25: __iter__
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def __iter__(self):
for fn in self.data_files:
fn = os.path.join(self.data_folder, fn)
with open(fn, 'rb') as f:
batches = pickle.load(f)
if self.shuffle: random.shuffle(batches) #shuffle data before batch
for batch in batches:
yield batch
del batches
gc.collect()
开发者ID:wengong-jin,项目名称:hgraph2graph,代码行数:14,
示例26: _run_test
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def _run_test(self, work_func: FunctionType, work_resource: object,
jobs: int, trials: int,
show_progress: bool=False) -> Mapping:
results = {
'jobs': jobs,
'trials': trials,
'time': [],
'blocks': [],
}
# Forcibly evaluate the inputs to prevent time/resources taken up later
inputs = list(zip(
[work_resource] * jobs,
range(jobs)
))
trial_iter = range(trials)
if show_progress is True and trials > 2:
trial_iter = tqdm(trial_iter, desc='trials')
gc.collect()
for _ in trial_iter:
# Run trial of pool map function and measure it
gc.collect()
blocks_start = sys.getallocatedblocks()
time_start = time.time()
list(self.map(work_func, inputs))
time_end = time.time()
results['time'].append(time_end - time_start)
# Get allocated blocks before garbage collection to show peak usage
blocks_end = sys.getallocatedblocks()
results['blocks'].append(blocks_end - blocks_start)
return results
开发者ID:JohnStarich,项目名称:python-pool-performance,代码行数:32,
示例27: two_reservoir_problem
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def two_reservoir_problem():
filename = os.path.join(TEST_FOLDER, 'models', 'two_reservoir.json')
yield TwoReservoirWrapper(filename)
# Clean up the
clear_global_model_cache()
# We force deallocation the cache here to prevent problems using process pools
# with pytest.
import gc
gc.collect()
开发者ID:pywr,项目名称:pywr,代码行数:11,
示例28: two_reservoir_constrained_problem
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def two_reservoir_constrained_problem():
filename = os.path.join(TEST_FOLDER, 'models', 'two_reservoir_constrained.json')
yield TwoReservoirWrapper(filename)
# Clean up the
clear_global_model_cache()
# We force deallocation the cache here to prevent problems using process pools
# with pytest.
import gc
gc.collect()
开发者ID:pywr,项目名称:pywr,代码行数:11,
示例29: setup_method
点赞 5
# 需要导入模块: import gc [as 别名]
# 或者: from gc import collect [as 别名]
def setup_method(self, method):
gc.collect()
开发者ID:myhdl,项目名称:myhdl,代码行数:4,
注:本文中的gc.collect方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。